Skip to content

Commit

Permalink
feat(sdk): add retry policy support to kfp v2 (#7867)
Browse files Browse the repository at this point in the history
* depend on kfp-pipeline-spec>=0.1.16

* implement RetryPolicy structure

* implement RetryPolicy structure tests

* implement .set_retry method on PipelineTask

* implement retry policy compilation logic

* implement compiler tests for retry

* implement pipeline with retry compilation test

* add release note

* clean up
  • Loading branch information
connor-mccarthy authored Jul 11, 2022
1 parent 2d81e7b commit 850a750
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 2 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## Breaking Changes

### For Pipeline Authors
* Add support for task-level retry policy [\#7867](https://github.com/kubeflow/pipelines/pull/7867)

### For Component Authors

Expand Down
33 changes: 33 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,5 +994,38 @@ def load_compiled_file(filename: str) -> Dict[str, Any]:
return ignore_kfp_version_helper(contents)


class TestSetRetryCompilation(unittest.TestCase):

def test_set_retry(self):

@dsl.component
def hello_world(text: str) -> str:
"""Hello world component."""
return text

@dsl.pipeline(name='hello-world', description='A simple intro pipeline')
def pipeline_hello_world(text: str = 'hi there'):
"""Hello world pipeline."""

hello_world(text=text).set_retry(
num_retries=3,
backoff_duration='30s',
backoff_factor=1.0,
backoff_max_duration='3h',
)

with tempfile.TemporaryDirectory() as tempdir:
package_path = os.path.join(tempdir, 'pipeline.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_hello_world, package_path=package_path)
pipeline_spec = pipeline_spec_from_file(package_path)

retry_policy = pipeline_spec.root.dag.tasks['hello-world'].retry_policy
self.assertEqual(retry_policy.max_retry_count, 3)
self.assertEqual(retry_policy.backoff_duration.seconds, 30)
self.assertEqual(retry_policy.backoff_factor, 1.0)
self.assertEqual(retry_policy.backoff_max_duration.seconds, 3600)


if __name__ == '__main__':
unittest.main()
6 changes: 5 additions & 1 deletion sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 The Kubeflow Authors
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,6 +133,10 @@ def build_task_spec_for_task(
pipeline_task_spec.caching_options.enable_cache = (
task.task_spec.enable_caching)

if task.task_spec.retry_policy is not None:
pipeline_task_spec.retry_policy.CopyFrom(
task.task_spec.retry_policy.to_proto())

for input_name, input_value in task.inputs.items():
if isinstance(input_value, pipeline_channel.PipelineArtifactChannel):

Expand Down
33 changes: 33 additions & 0 deletions sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import compiler
from kfp import dsl


@dsl.component
def add(a: float, b: float) -> float:
return a + b


@dsl.pipeline(name='test-pipeline')
def my_pipeline(a: float = 1, b: float = 7):
add_task = add(a=a, b=b)
add_task.set_retry(num_retries=3)


if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
components:
comp-add:
executorLabel: exec-add
inputDefinitions:
parameters:
a:
parameterType: NUMBER_DOUBLE
b:
parameterType: NUMBER_DOUBLE
outputDefinitions:
parameters:
Output:
parameterType: NUMBER_DOUBLE
deploymentSpec:
executors:
exec-add:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- add
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-alpha.5'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef add(a: float, b: float) -> float:\n return a + b\n\n"
image: python:3.7
pipelineInfo:
name: test-pipeline
root:
dag:
tasks:
add:
cachingOptions:
enableCache: true
componentRef:
name: comp-add
inputs:
parameters:
a:
componentInputParameter: a
b:
componentInputParameter: b
retryPolicy:
backoffDuration: 0s
backoffFactor: 2.0
backoffMaxDuration: 3600s
maxRetryCount: 3
taskInfo:
name: add
inputDefinitions:
parameters:
a:
defaultValue: 1.0
parameterType: NUMBER_DOUBLE
b:
defaultValue: 7.0
parameterType: NUMBER_DOUBLE
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-alpha.5
24 changes: 24 additions & 0 deletions sdk/python/kfp/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,30 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':

return self

def set_retry(self,
num_retries: int,
backoff_duration: Optional[str] = None,
backoff_factor: Optional[float] = None,
backoff_max_duration: Optional[str] = None) -> 'PipelineTask':
"""Sets task retry parameters.
Args:
num_retries (int): Number of times to retry on failure.
backoff_duration (Optional[int]): The the number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry).
backoff_factor (Optional[float]): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on. Defaults to 2.0.
backoff_max_duration (Optional[int]): The maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'.
Returns:
Self return to allow chained setting calls.
"""
self.task_spec.retry_policy = structures.RetryPolicy(
max_retry_count=num_retries,
backoff_duration=backoff_duration,
backoff_factor=backoff_factor,
backoff_max_duration=backoff_max_duration,
)
return self

def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type requirement for this task.
Expand Down
93 changes: 93 additions & 0 deletions sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import collections
import functools
import itertools
import re
from typing import Any, Dict, List, Mapping, Optional, Union
import uuid

from google.protobuf import json_format
import kfp
from kfp import dsl
from kfp.compiler import compiler
Expand Down Expand Up @@ -236,6 +238,48 @@ def from_container_dict(cls, container_dict: Dict[str,
resources=None) # can only be set on tasks


class RetryPolicy(base_model.BaseModel):
"""The retry policy of a container execution.
Attributes:
num_retries (int): Number of times to retry on failure.
backoff_duration (int): The the number of seconds to wait before triggering a retry.
backoff_factor (float): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
backoff_max_duration (int): The maximum duration during which the task will be retried.
"""
max_retry_count: Optional[int] = None
backoff_duration: Optional[str] = None
backoff_factor: Optional[float] = None
backoff_max_duration: Optional[str] = None

def to_proto(self) -> pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy:
# include defaults so that IR is more reflective of runtime behavior
max_retry_count = self.max_retry_count or 0
backoff_duration = self.backoff_duration or '0s'
backoff_factor = self.backoff_factor or 2.0
backoff_max_duration = self.backoff_max_duration or '3600s'

# include max duration seconds cap so that IR is more reflective of runtime behavior
backoff_duration_seconds = f'{convert_duration_to_seconds(backoff_duration)}s'
backoff_max_duration_seconds = f'{min(convert_duration_to_seconds(backoff_max_duration), 3600)}s'

return json_format.ParseDict(
{
'max_retry_count': max_retry_count,
'backoff_duration': backoff_duration_seconds,
'backoff_factor': backoff_factor,
'backoff_max_duration': backoff_max_duration_seconds,
}, pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy())

@classmethod
def from_proto(
cls, retry_policy_proto: pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy
) -> 'RetryPolicy':
return cls.from_dict(
json_format.MessageToDict(
retry_policy_proto, preserving_proto_field_name=True))


class TaskSpec(base_model.BaseModel):
"""The spec of a pipeline task.
Expand Down Expand Up @@ -268,6 +312,7 @@ class TaskSpec(base_model.BaseModel):
iterator_item_input: Optional[str] = None
enable_caching: bool = True
display_name: Optional[str] = None
retry_policy: Optional[RetryPolicy] = None


class DagSpec(base_model.BaseModel):
Expand Down Expand Up @@ -698,3 +743,51 @@ def get_inputs(task_group: tasks_group.TasksGroup,
)

return pipeline_spec


def normalize_time_string(duration: str) -> str:
"""Normalizes a time string.
Examples:
- '1 hour' -> '1h'
- '2 hours' -> '2h'
- '2hours' -> '2h'
- '2 w' -> '2w'
- '2w' -> '2w'
Args:
duration (str): The unnormalized duration string.
Returns:
str: The normalized duration string.
"""
no_ws_duration = duration.replace(' ', '')
duration_split = [el for el in re.split(r'(\D+)', no_ws_duration) if el]

if len(duration_split) != 2:
raise ValueError(
f"Invalid duration string: '{duration}'. Expected one value (as integer in string) and one unit, such as '1 hour'."
)

value = duration_split[0]
unit = duration_split[1]

first_letter_of_unit = unit[0]
return value + first_letter_of_unit


def convert_duration_to_seconds(duration: str) -> int:
"""Converts a duration string to seconds.
Args:
duration (str): The unnormalized duration string. (e.g. '1h', '1 hour', '2
hours', '2w', '2 weeks', '2d', etc.)
Raises:
ValueError: If the time unit is not one of seconds, minutes, hours, days,
or weeks.
Returns:
int: The number of seconds in the duration.
"""
duration = normalize_time_string(duration)
seconds_per_unit = {'s': 1, 'm': 60, 'h': 3_600, 'd': 86_400, 'w': 604_800}
if duration[-1] not in seconds_per_unit.keys():
raise ValueError(
f"Unsupported duration unit: '{duration[-1]}' for '{duration}'.")
return int(duration[:-1]) * seconds_per_unit[duration[-1]]
Loading

0 comments on commit 850a750

Please sign in to comment.