Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add retry policy support to kfp v2 #7867

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## Breaking Changes

### For Pipeline Authors
* Add support for task-level retry policy [\#7867](https://github.com/kubeflow/pipelines/pull/7867)

### For Component Authors

Expand Down
33 changes: 33 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,5 +994,38 @@ def load_compiled_file(filename: str) -> Dict[str, Any]:
return ignore_kfp_version_helper(contents)


class TestSetRetryCompilation(unittest.TestCase):

def test_set_retry(self):

@dsl.component
def hello_world(text: str) -> str:
"""Hello world component."""
return text

@dsl.pipeline(name='hello-world', description='A simple intro pipeline')
def pipeline_hello_world(text: str = 'hi there'):
"""Hello world pipeline."""

hello_world(text=text).set_retry(
num_retries=3,
backoff_duration='30s',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, remove the parameters that don't work on Vertex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on a second look, it's fine to keep this as-is, as long as the samples under test_data/pipelines doesn't show these no-op usage.

backoff_factor=1.0,
backoff_max_duration='3h',
)

with tempfile.TemporaryDirectory() as tempdir:
package_path = os.path.join(tempdir, 'pipeline.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_hello_world, package_path=package_path)
pipeline_spec = pipeline_spec_from_file(package_path)

retry_policy = pipeline_spec.root.dag.tasks['hello-world'].retry_policy
self.assertEqual(retry_policy.max_retry_count, 3)
self.assertEqual(retry_policy.backoff_duration.seconds, 30)
self.assertEqual(retry_policy.backoff_factor, 1.0)
self.assertEqual(retry_policy.backoff_max_duration.seconds, 3600)


if __name__ == '__main__':
unittest.main()
6 changes: 5 additions & 1 deletion sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 The Kubeflow Authors
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,6 +133,10 @@ def build_task_spec_for_task(
pipeline_task_spec.caching_options.enable_cache = (
task.task_spec.enable_caching)

if task.task_spec.retry_policy is not None:
pipeline_task_spec.retry_policy.CopyFrom(
task.task_spec.retry_policy.to_proto())

for input_name, input_value in task.inputs.items():
if isinstance(input_value, pipeline_channel.PipelineArtifactChannel):

Expand Down
33 changes: 33 additions & 0 deletions sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import compiler
from kfp import dsl


@dsl.component
def add(a: float, b: float) -> float:
return a + b


@dsl.pipeline(name='test-pipeline')
def my_pipeline(a: float = 1, b: float = 7):
add_task = add(a=a, b=b)
add_task.set_retry(num_retries=3)


if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
a:
parameterType: NUMBER_DOUBLE
b:
parameterType: NUMBER_DOUBLE
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_DOUBLE
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-alpha.5'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)

printf "%s" "$0" > "$program_path/ephemeral_component.py"

python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(a: float, b: float) -> float:\n return a + b\n\n"
image: python:3.7
pipelineInfo:
name: test-pipeline
root:
dag:
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
inputs:
parameters:
a:
componentInputParameter: a
b:
componentInputParameter: b
retryPolicy:
backoffDuration: 0s
backoffFactor: 2.0
backoffMaxDuration: 3600s
maxRetryCount: 3
taskInfo:
name: add
inputDefinitions:
parameters:
a:
defaultValue: 1.0
parameterType: NUMBER_DOUBLE
b:
defaultValue: 7.0
parameterType: NUMBER_DOUBLE
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-alpha.5
24 changes: 24 additions & 0 deletions sdk/python/kfp/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,30 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':

return self

def set_retry(self,
num_retries: int,
backoff_duration: Optional[str] = None,
backoff_factor: Optional[float] = None,
backoff_max_duration: Optional[str] = None) -> 'PipelineTask':
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: A policy parameter was included in the v1 ContainerOp.set_retry method. policy is specific to the Argo workflow spec and not included in the IR RetryPolicy message.

Should we add here, but raise a DeprecationWarning indicating that policy is a no-op? Or omit this parameter in v2 as a small breaking change?

cc: @chensun

"""Sets task retry parameters.

Args:
num_retries (int): Number of times to retry on failure.
backoff_duration (Optional[int]): The the number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry).
backoff_factor (Optional[float]): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on. Defaults to 2.0.
backoff_max_duration (Optional[int]): The maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'.

Returns:
Self return to allow chained setting calls.
"""
self.task_spec.retry_policy = structures.RetryPolicy(
max_retry_count=num_retries,
backoff_duration=backoff_duration,
backoff_factor=backoff_factor,
backoff_max_duration=backoff_max_duration,
)
return self

def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type requirement for this task.

Expand Down
93 changes: 93 additions & 0 deletions sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import collections
import functools
import itertools
import re
from typing import Any, Dict, List, Mapping, Optional, Union
import uuid

from google.protobuf import json_format
import kfp
from kfp import dsl
from kfp.compiler import compiler
Expand Down Expand Up @@ -236,6 +238,48 @@ def from_container_dict(cls, container_dict: Dict[str,
resources=None) # can only be set on tasks


class RetryPolicy(base_model.BaseModel):
"""The retry policy of a container execution.

Attributes:
num_retries (int): Number of times to retry on failure.
backoff_duration (int): The the number of seconds to wait before triggering a retry.
backoff_factor (float): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
backoff_max_duration (int): The maximum duration during which the task will be retried.
"""
max_retry_count: Optional[int] = None
backoff_duration: Optional[str] = None
backoff_factor: Optional[float] = None
backoff_max_duration: Optional[str] = None

def to_proto(self) -> pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy:
# include defaults so that IR is more reflective of runtime behavior
max_retry_count = self.max_retry_count or 0
backoff_duration = self.backoff_duration or '0s'
backoff_factor = self.backoff_factor or 2.0
backoff_max_duration = self.backoff_max_duration or '3600s'

# include max duration seconds cap so that IR is more reflective of runtime behavior
backoff_duration_seconds = f'{convert_duration_to_seconds(backoff_duration)}s'
backoff_max_duration_seconds = f'{min(convert_duration_to_seconds(backoff_max_duration), 3600)}s'

return json_format.ParseDict(
{
'max_retry_count': max_retry_count,
'backoff_duration': backoff_duration_seconds,
'backoff_factor': backoff_factor,
'backoff_max_duration': backoff_max_duration_seconds,
}, pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy())

@classmethod
def from_proto(
cls, retry_policy_proto: pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy
) -> 'RetryPolicy':
return cls.from_dict(
json_format.MessageToDict(
retry_policy_proto, preserving_proto_field_name=True))


class TaskSpec(base_model.BaseModel):
"""The spec of a pipeline task.

Expand Down Expand Up @@ -268,6 +312,7 @@ class TaskSpec(base_model.BaseModel):
iterator_item_input: Optional[str] = None
enable_caching: bool = True
display_name: Optional[str] = None
retry_policy: Optional[RetryPolicy] = None


class DagSpec(base_model.BaseModel):
Expand Down Expand Up @@ -698,3 +743,51 @@ def get_inputs(task_group: tasks_group.TasksGroup,
)

return pipeline_spec


def normalize_time_string(duration: str) -> str:
"""Normalizes a time string.
Examples:
- '1 hour' -> '1h'
- '2 hours' -> '2h'
- '2hours' -> '2h'
- '2 w' -> '2w'
- '2w' -> '2w'
Args:
duration (str): The unnormalized duration string.
Returns:
str: The normalized duration string.
"""
no_ws_duration = duration.replace(' ', '')
duration_split = [el for el in re.split(r'(\D+)', no_ws_duration) if el]

if len(duration_split) != 2:
raise ValueError(
f"Invalid duration string: '{duration}'. Expected one value (as integer in string) and one unit, such as '1 hour'."
)

value = duration_split[0]
unit = duration_split[1]

first_letter_of_unit = unit[0]
return value + first_letter_of_unit


def convert_duration_to_seconds(duration: str) -> int:
"""Converts a duration string to seconds.

Args:
duration (str): The unnormalized duration string. (e.g. '1h', '1 hour', '2
hours', '2w', '2 weeks', '2d', etc.)
Raises:
ValueError: If the time unit is not one of seconds, minutes, hours, days,
or weeks.
Returns:
int: The number of seconds in the duration.
"""
duration = normalize_time_string(duration)
seconds_per_unit = {'s': 1, 'm': 60, 'h': 3_600, 'd': 86_400, 'w': 604_800}
if duration[-1] not in seconds_per_unit.keys():
raise ValueError(
f"Unsupported duration unit: '{duration[-1]}' for '{duration}'.")
return int(duration[:-1]) * seconds_per_unit[duration[-1]]
Loading