diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 36f39cf6317..49a0db6510c 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -5,6 +5,7 @@ ## Breaking Changes ### For Pipeline Authors +* Add support for task-level retry policy [\#7867](https://github.com/kubeflow/pipelines/pull/7867) ### For Component Authors diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 0a9cbac8c10..8c80b414988 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -994,5 +994,38 @@ def load_compiled_file(filename: str) -> Dict[str, Any]: return ignore_kfp_version_helper(contents) +class TestSetRetryCompilation(unittest.TestCase): + + def test_set_retry(self): + + @dsl.component + def hello_world(text: str) -> str: + """Hello world component.""" + return text + + @dsl.pipeline(name='hello-world', description='A simple intro pipeline') + def pipeline_hello_world(text: str = 'hi there'): + """Hello world pipeline.""" + + hello_world(text=text).set_retry( + num_retries=3, + backoff_duration='30s', + backoff_factor=1.0, + backoff_max_duration='3h', + ) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=pipeline_hello_world, package_path=package_path) + pipeline_spec = pipeline_spec_from_file(package_path) + + retry_policy = pipeline_spec.root.dag.tasks['hello-world'].retry_policy + self.assertEqual(retry_policy.max_retry_count, 3) + self.assertEqual(retry_policy.backoff_duration.seconds, 30) + self.assertEqual(retry_policy.backoff_factor, 1.0) + self.assertEqual(retry_policy.backoff_max_duration.seconds, 3600) + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index befec90f6a2..5359f43fc10 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1,4 +1,4 @@ -# Copyright 2021 The Kubeflow Authors +# Copyright 2021-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -133,6 +133,10 @@ def build_task_spec_for_task( pipeline_task_spec.caching_options.enable_cache = ( task.task_spec.enable_caching) + if task.task_spec.retry_policy is not None: + pipeline_task_spec.retry_policy.CopyFrom( + task.task_spec.retry_policy.to_proto()) + for input_name, input_value in task.inputs.items(): if isinstance(input_value, pipeline_channel.PipelineArtifactChannel): diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.py b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.py new file mode 100644 index 00000000000..3a2d8a506bc --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.py @@ -0,0 +1,33 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import compiler +from kfp import dsl + + +@dsl.component +def add(a: float, b: float) -> float: + return a + b + + +@dsl.pipeline(name='test-pipeline') +def my_pipeline(a: float = 1, b: float = 7): + add_task = add(a=a, b=b) + add_task.set_retry(num_retries=3) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.yaml b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.yaml new file mode 100644 index 00000000000..011e730f3bf --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_retry.yaml @@ -0,0 +1,74 @@ +components: + comp-add: + executorLabel: exec-add + inputDefinitions: + parameters: + a: + parameterType: NUMBER_DOUBLE + b: + parameterType: NUMBER_DOUBLE + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-add: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - add + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-alpha.5'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef add(a: float, b: float) -> float:\n return a + b\n\n" + image: python:3.7 +pipelineInfo: + name: test-pipeline +root: + dag: + tasks: + add: + cachingOptions: + enableCache: true + componentRef: + name: comp-add + inputs: + parameters: + a: + componentInputParameter: a + b: + componentInputParameter: b + retryPolicy: + backoffDuration: 0s + backoffFactor: 2.0 + backoffMaxDuration: 3600s + maxRetryCount: 3 + taskInfo: + name: add + inputDefinitions: + parameters: + a: + defaultValue: 1.0 + parameterType: NUMBER_DOUBLE + b: + defaultValue: 7.0 + parameterType: NUMBER_DOUBLE +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-alpha.5 diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index 37be5278608..1b398f7e13e 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -450,6 +450,30 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask': return self + def set_retry(self, + num_retries: int, + backoff_duration: Optional[str] = None, + backoff_factor: Optional[float] = None, + backoff_max_duration: Optional[str] = None) -> 'PipelineTask': + """Sets task retry parameters. + + Args: + num_retries (int): Number of times to retry on failure. + backoff_duration (Optional[int]): The the number of seconds to wait before triggering a retry. Defaults to '0s' (immediate retry). + backoff_factor (Optional[float]): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on. Defaults to 2.0. + backoff_max_duration (Optional[int]): The maximum duration during which the task will be retried. Maximum duration is 1 hour (3600s). Defaults to '3600s'. + + Returns: + Self return to allow chained setting calls. + """ + self.task_spec.retry_policy = structures.RetryPolicy( + max_retry_count=num_retries, + backoff_duration=backoff_duration, + backoff_factor=backoff_factor, + backoff_max_duration=backoff_max_duration, + ) + return self + def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask': """Sets accelerator type requirement for this task. diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 25e7114961e..5f05d421976 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -17,9 +17,11 @@ import collections import functools import itertools +import re from typing import Any, Dict, List, Mapping, Optional, Union import uuid +from google.protobuf import json_format import kfp from kfp import dsl from kfp.compiler import compiler @@ -236,6 +238,48 @@ def from_container_dict(cls, container_dict: Dict[str, resources=None) # can only be set on tasks +class RetryPolicy(base_model.BaseModel): + """The retry policy of a container execution. + + Attributes: + num_retries (int): Number of times to retry on failure. + backoff_duration (int): The the number of seconds to wait before triggering a retry. + backoff_factor (float): The exponential backoff factor applied to backoff_duration. For example, if backoff_duration="60" (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on. + backoff_max_duration (int): The maximum duration during which the task will be retried. + """ + max_retry_count: Optional[int] = None + backoff_duration: Optional[str] = None + backoff_factor: Optional[float] = None + backoff_max_duration: Optional[str] = None + + def to_proto(self) -> pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy: + # include defaults so that IR is more reflective of runtime behavior + max_retry_count = self.max_retry_count or 0 + backoff_duration = self.backoff_duration or '0s' + backoff_factor = self.backoff_factor or 2.0 + backoff_max_duration = self.backoff_max_duration or '3600s' + + # include max duration seconds cap so that IR is more reflective of runtime behavior + backoff_duration_seconds = f'{convert_duration_to_seconds(backoff_duration)}s' + backoff_max_duration_seconds = f'{min(convert_duration_to_seconds(backoff_max_duration), 3600)}s' + + return json_format.ParseDict( + { + 'max_retry_count': max_retry_count, + 'backoff_duration': backoff_duration_seconds, + 'backoff_factor': backoff_factor, + 'backoff_max_duration': backoff_max_duration_seconds, + }, pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy()) + + @classmethod + def from_proto( + cls, retry_policy_proto: pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy + ) -> 'RetryPolicy': + return cls.from_dict( + json_format.MessageToDict( + retry_policy_proto, preserving_proto_field_name=True)) + + class TaskSpec(base_model.BaseModel): """The spec of a pipeline task. @@ -268,6 +312,7 @@ class TaskSpec(base_model.BaseModel): iterator_item_input: Optional[str] = None enable_caching: bool = True display_name: Optional[str] = None + retry_policy: Optional[RetryPolicy] = None class DagSpec(base_model.BaseModel): @@ -698,3 +743,51 @@ def get_inputs(task_group: tasks_group.TasksGroup, ) return pipeline_spec + + +def normalize_time_string(duration: str) -> str: + """Normalizes a time string. + Examples: + - '1 hour' -> '1h' + - '2 hours' -> '2h' + - '2hours' -> '2h' + - '2 w' -> '2w' + - '2w' -> '2w' + Args: + duration (str): The unnormalized duration string. + Returns: + str: The normalized duration string. + """ + no_ws_duration = duration.replace(' ', '') + duration_split = [el for el in re.split(r'(\D+)', no_ws_duration) if el] + + if len(duration_split) != 2: + raise ValueError( + f"Invalid duration string: '{duration}'. Expected one value (as integer in string) and one unit, such as '1 hour'." + ) + + value = duration_split[0] + unit = duration_split[1] + + first_letter_of_unit = unit[0] + return value + first_letter_of_unit + + +def convert_duration_to_seconds(duration: str) -> int: + """Converts a duration string to seconds. + + Args: + duration (str): The unnormalized duration string. (e.g. '1h', '1 hour', '2 + hours', '2w', '2 weeks', '2d', etc.) + Raises: + ValueError: If the time unit is not one of seconds, minutes, hours, days, + or weeks. + Returns: + int: The number of seconds in the duration. + """ + duration = normalize_time_string(duration) + seconds_per_unit = {'s': 1, 'm': 60, 'h': 3_600, 'd': 86_400, 'w': 604_800} + if duration[-1] not in seconds_per_unit.keys(): + raise ValueError( + f"Unsupported duration unit: '{duration[-1]}' for '{duration}'.") + return int(duration[:-1]) * seconds_per_unit[duration[-1]] diff --git a/sdk/python/kfp/components/structures_test.py b/sdk/python/kfp/components/structures_test.py index 69ef2574455..7d597b71e75 100644 --- a/sdk/python/kfp/components/structures_test.py +++ b/sdk/python/kfp/components/structures_test.py @@ -19,9 +19,11 @@ import unittest from absl.testing import parameterized +from google.protobuf import json_format from kfp import compiler from kfp.components import placeholders from kfp.components import structures +from kfp.pipeline_spec import pipeline_spec_pb2 V1_YAML_IF_PLACEHOLDER = textwrap.dedent("""\ implementation: @@ -833,5 +835,78 @@ def test_concat_placeholder(self): self.assertEqual(loaded_component_spec, component_spec) +class TestNormalizeTimeString(parameterized.TestCase): + + @parameterized.parameters([ + ('1 hour', '1h'), + ('2 hours', '2h'), + ('2hours', '2h'), + ('2 w', '2w'), + ('2d', '2d'), + ]) + def test(self, unnorm: str, norm: str): + self.assertEqual(structures.normalize_time_string(unnorm), norm) + + def test_multipart_duration_raises(self): + with self.assertRaisesRegex(ValueError, 'Invalid duration string:'): + structures.convert_duration_to_seconds('1 day 1 hour') + + def test_non_int_value_raises(self): + with self.assertRaisesRegex(ValueError, 'Invalid duration string:'): + structures.convert_duration_to_seconds('one hour') + + +class TestConvertDurationToSeconds(parameterized.TestCase): + + @parameterized.parameters([ + ('1 hour', 3600), + ('2 hours', 7200), + ('2hours', 7200), + ('2 w', 1209600), + ('2d', 172800), + ]) + def test(self, duration: str, seconds: int): + self.assertEqual( + structures.convert_duration_to_seconds(duration), seconds) + + def test_unsupported_duration_unit(self): + with self.assertRaisesRegex(ValueError, 'Unsupported duration unit:'): + structures.convert_duration_to_seconds('1 year') + + +class TestRetryPolicy(unittest.TestCase): + + def test_to_proto(self): + retry_policy_struct = structures.RetryPolicy( + max_retry_count=10, + backoff_duration='1h', + backoff_factor=1.5, + backoff_max_duration='2 weeks') + + retry_policy_proto = retry_policy_struct.to_proto() + self.assertEqual(retry_policy_proto.max_retry_count, 10) + self.assertEqual(retry_policy_proto.backoff_duration.seconds, 3600) + self.assertEqual(retry_policy_proto.backoff_factor, 1.5) + # tests cap + self.assertEqual(retry_policy_proto.backoff_max_duration.seconds, 3600) + + def test_from_proto(self): + retry_policy_proto = json_format.ParseDict( + { + 'max_retry_count': 3, + 'backoff_duration': '5s', + 'backoff_factor': 1.0, + 'backoff_max_duration': '1s' + }, pipeline_spec_pb2.PipelineTaskSpec.RetryPolicy()) + retry_policy_struct = structures.RetryPolicy.from_proto( + retry_policy_proto) + print(retry_policy_struct) + + self.assertEqual(retry_policy_struct.max_retry_count, 3) + self.assertEqual(retry_policy_struct.backoff_duration, '5s') + self.assertEqual(retry_policy_struct.backoff_factor, 1.0) + self.assertEqual(retry_policy_struct.backoff_max_duration, '1s') + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index e76d398a90a..09f7dc91586 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -16,7 +16,7 @@ google-cloud-storage>=2.2.1,<3 # NOTE: Maintainers, please do not require google-auth>=2.x.x # Until this issue is closed # https://github.com/googleapis/google-cloud-python/issues/10566 -kfp-pipeline-spec>=0.1.14,<0.2.0 +kfp-pipeline-spec>=0.1.16,<0.2.0 # Update the upper version whenever a new major version of the # kfp-server-api package is released. # Update the lower version when kfp sdk depends on new apis/fields in