From a903dc0b0a4059ad9e6923c34fb0f4d108e705b7 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Fri, 6 Oct 2023 11:48:51 -0700 Subject: [PATCH] Moving from flytepropeller - Adding flags for ignore-retry-cause and default-max-attempts (#4153) * added flags for ignore-retry-cause and default-max-attempts Signed-off-by: Daniel Rammer * Switch non interruptible @ unified retry behavior (#610) Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * fixed lint issues Signed-off-by: Daniel Rammer * began unit tests Signed-off-by: Daniel Rammer * enabling negative interruptibleFailureThreshold Signed-off-by: Daniel Rammer * fixed unit tests Signed-off-by: Daniel Rammer * Finish simplify retry behavior PR (#623) * Cleanup retry behavior Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Fix interruptible retry threshold for odl behavior Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Add tests for BuildNodeExecutionContext Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * Fix IsElgibileForRetries Tests Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> --------- Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> * updated flyteplugins Signed-off-by: Daniel Rammer * fixed lint issue Signed-off-by: Daniel Rammer * fixed monorepo transition Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> Signed-off-by: Eduardo Apolinario Co-authored-by: Daniel Rammer Co-authored-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com> Co-authored-by: Eduardo Apolinario --- .../pluginmachinery/core/exec_metadata.go | 2 +- .../core/mocks/task_execution_metadata.go | 10 +-- .../plugins/array/k8s/subtask_exec_context.go | 2 +- ...ization.multi_images.my_workflow_2_wf.yaml | 4 - ...ontainerization.raw_container.wf_2_wf.yaml | 12 --- ...n.use_secrets.my_secret_workflow_2_wf.yaml | 6 -- ..._flow.chain_tasks.chain_tasks_wf_2_wf.yaml | 4 - ...ntrol_flow.conditions.multiplier_2_wf.yaml | 6 -- ...rol_flow.conditions.multiplier_2_2_wf.yaml | 6 -- ...rol_flow.conditions.multiplier_3_2_wf.yaml | 8 -- ...flow.conditions.basic_boolean_wf_2_wf.yaml | 8 -- ...ol_flow.conditions.bool_input_wf_2_wf.yaml | 6 -- ...low.conditions.nested_conditions_2_wf.yaml | 12 --- ..._flow.conditions.consume_outputs_2_wf.yaml | 10 --- ...48_core.control_flow.dynamics.wf_2_wf.yaml | 2 - ...ol_flow.map_task.my_map_workflow_2_wf.yaml | 2 - ...ntrol_flow.merge_sort.merge_sort_2_wf.yaml | 6 -- ...ntrol_flow.subworkflows.my_subwf_2_wf.yaml | 4 - ...l_flow.subworkflows.ext_workflow_2_wf.yaml | 2 - ...e.custom_task_plugin.my_workflow_2_wf.yaml | 2 - ...ore.extend_flyte.custom_types.wf_2_wf.yaml | 4 - ...lyte_basics.basic_workflow.my_wf_2_wf.yaml | 4 - ...flyte_basics.decorating_tasks.wf_2_wf.yaml | 4 - ...e_basics.decorating_workflows.wf_2_wf.yaml | 8 -- ...mented_workflow.sphinx_docstring_2_wf.yaml | 2 - ...umented_workflow.numpy_docstring_2_wf.yaml | 2 - ...mented_workflow.google_docstring_2_wf.yaml | 2 - ..._basics.files.normalize_csv_file_2_wf.yaml | 2 - ...download_and_normalize_csv_files_2_wf.yaml | 4 - ...e.flyte_basics.hello_world.my_wf_2_wf.yaml | 2 - ...7_my.imperative.workflow.example_2_wf.yaml | 6 -- .../120_core.flyte_basics.lp.my_wf_2_wf.yaml | 2 - ...25_core.flyte_basics.lp.go_greet_2_wf.yaml | 2 - ...flyte_basics.named_outputs.my_wf_2_wf.yaml | 4 - ..._core.flyte_basics.shell_task.wf_2_wf.yaml | 8 -- ...s.task_cache.cached_dataframe_wf_2_wf.yaml | 8 -- ...s.lp_schedules.date_formatter_wf_2_wf.yaml | 2 - ...rkflows.lp_schedules.positive_wf_2_wf.yaml | 2 - ...re.type_system.custom_objects.wf_2_wf.yaml | 10 --- ...6_core.type_system.enums.enum_wf_2_wf.yaml | 4 - ...type_system.flyte_pickle.welcome_2_wf.yaml | 2 - ...73_core.type_system.schema.df_wf_2_wf.yaml | 4 - ..._dataset.pandas_compatibility_wf_2_wf.yaml | 6 -- ..._dataset.schema_compatibility_wf_2_wf.yaml | 6 -- ...core.type_system.typed_schema.wf_2_wf.yaml | 4 - .../pkg/compiler/transformers/k8s/utils.go | 4 +- .../pkg/controller/config/config.go | 8 +- .../pkg/controller/config/config_flags.go | 4 +- .../controller/config/config_flags_test.go | 32 +++++++- .../pkg/controller/nodes/array/handler.go | 3 +- .../pkg/controller/nodes/executor.go | 25 ++++--- .../pkg/controller/nodes/executor_test.go | 52 +++++++++++++ .../mocks/node_execution_metadata.go | 10 +-- .../nodes/interfaces/node_exec_context.go | 2 +- .../pkg/controller/nodes/node_exec_context.go | 37 ++++++++-- .../nodes/node_exec_context_test.go | 74 +++++++++++++++++++ .../controller/nodes/task/taskexec_context.go | 4 +- 57 files changed, 228 insertions(+), 245 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go index d5b8b81b5a..18138006ce 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go @@ -44,6 +44,6 @@ type TaskExecutionMetadata interface { GetSecurityContext() core.SecurityContext IsInterruptible() bool GetPlatformResources() *v1.ResourceRequirements - GetInterruptibleFailureThreshold() uint32 + GetInterruptibleFailureThreshold() int32 GetEnvironmentVariables() map[string]string } diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go index 0e6651a2a7..b9115f00c6 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go @@ -92,7 +92,7 @@ type TaskExecutionMetadata_GetInterruptibleFailureThreshold struct { *mock.Call } -func (_m TaskExecutionMetadata_GetInterruptibleFailureThreshold) Return(_a0 uint32) *TaskExecutionMetadata_GetInterruptibleFailureThreshold { +func (_m TaskExecutionMetadata_GetInterruptibleFailureThreshold) Return(_a0 int32) *TaskExecutionMetadata_GetInterruptibleFailureThreshold { return &TaskExecutionMetadata_GetInterruptibleFailureThreshold{Call: _m.Call.Return(_a0)} } @@ -107,14 +107,14 @@ func (_m *TaskExecutionMetadata) OnGetInterruptibleFailureThresholdMatch(matcher } // GetInterruptibleFailureThreshold provides a mock function with given fields: -func (_m *TaskExecutionMetadata) GetInterruptibleFailureThreshold() uint32 { +func (_m *TaskExecutionMetadata) GetInterruptibleFailureThreshold() int32 { ret := _m.Called() - var r0 uint32 - if rf, ok := ret.Get(0).(func() uint32); ok { + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { r0 = rf() } else { - r0 = ret.Get(0).(uint32) + r0 = ret.Get(0).(int32) } return r0 diff --git a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go index 059e10b540..beeb1a7787 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go @@ -268,7 +268,7 @@ func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecution } subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, retryAttempt) - interruptible := taskExecutionMetadata.IsInterruptible() && uint32(systemFailures) < taskExecutionMetadata.GetInterruptibleFailureThreshold() + interruptible := taskExecutionMetadata.IsInterruptible() && int32(systemFailures) < taskExecutionMetadata.GetInterruptibleFailureThreshold() return SubTaskExecutionMetadata{ taskExecutionMetadata, utils.UnionMaps(taskExecutionMetadata.GetAnnotations(), secretsMap), diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml index 12b97cfd0e..5e9ddd0b12 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml @@ -75,8 +75,6 @@ spec: kind: task name: svm_trainer resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.multi_images.svm_trainer" ' n1: id: n1 @@ -104,8 +102,6 @@ spec: kind: task name: svm_predictor resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.multi_images.svm_predictor" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml index d73c316018..550880e7e8 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml @@ -125,8 +125,6 @@ spec: kind: task name: ellipse-area-metadata-shell resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"ellipse-area-metadata-shell" ' n1: id: n1 @@ -144,8 +142,6 @@ spec: kind: task name: ellipse-area-metadata-python resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"ellipse-area-metadata-python" ' n2: id: n2 @@ -163,8 +159,6 @@ spec: kind: task name: ellipse-area-metadata-r resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"ellipse-area-metadata-r" ' n3: id: n3 @@ -182,8 +176,6 @@ spec: kind: task name: ellipse-area-metadata-haskell resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"ellipse-area-metadata-haskell" ' n4: id: n4 @@ -201,8 +193,6 @@ spec: kind: task name: ellipse-area-metadata-julia resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"ellipse-area-metadata-julia" ' n5: id: n5 @@ -260,8 +250,6 @@ spec: kind: task name: report_all_calculated_areas resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.raw_container.report_all_calculated_areas" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml index 8c7cfb7c88..359bb67831 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml @@ -107,24 +107,18 @@ spec: kind: task name: secret_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.use_secrets.secret_task" ' n1: id: n1 kind: task name: user_info_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.use_secrets.user_info_task" ' n2: id: n2 kind: task name: secret_file_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.containerization.use_secrets.secret_file_task" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml index ad22eceb26..ec6ceff710 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml @@ -75,16 +75,12 @@ spec: kind: task name: write resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.chain_tasks.write" ' n1: id: n1 kind: task name: read resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.chain_tasks.read" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml index 74116d674f..854e071a58 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml @@ -106,8 +106,6 @@ spec: kind: branch name: fractions resources: {} - retry: - minAttempts: 1 n0-n0: id: n0-n0 inputBindings: @@ -119,8 +117,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' n0-n1: id: n0-n1 @@ -133,8 +129,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml index b24ad357b3..7ec08f7c34 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml @@ -128,8 +128,6 @@ spec: kind: branch name: fractions resources: {} - retry: - minAttempts: 1 n0-n0: id: n0-n0 inputBindings: @@ -141,8 +139,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' n0-n1: id: n0-n1 @@ -155,8 +151,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml index 37e8b5f33f..10d028fa4a 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml @@ -134,8 +134,6 @@ spec: kind: branch name: fractions resources: {} - retry: - minAttempts: 1 n0-n0: id: n0-n0 inputBindings: @@ -147,8 +145,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' n0-n1: id: n0-n1 @@ -161,8 +157,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' n1: id: n1 @@ -175,8 +169,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml index 3856ff37fc..f46b2040e3 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml @@ -86,8 +86,6 @@ spec: kind: task name: coin_toss resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.coin_toss" ' n1: branch: @@ -111,23 +109,17 @@ spec: kind: branch name: test resources: {} - retry: - minAttempts: 1 n1-n0: id: n1-n0 kind: task name: success resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.success" ' n1-n1: id: n1-n1 kind: task name: failed resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.failed" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml index 03ad38cd95..3e7d789001 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml @@ -91,23 +91,17 @@ spec: kind: branch name: test resources: {} - retry: - minAttempts: 1 n0-n0: id: n0-n0 kind: task name: success resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.success" ' n0-n1: id: n0-n1 kind: task name: failed resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.failed" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml index 7c13f51deb..8579f95947 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml @@ -132,8 +132,6 @@ spec: kind: branch name: fractions resources: {} - retry: - minAttempts: 1 n0-n0: branch: elseFail: @@ -179,8 +177,6 @@ spec: kind: branch name: inner_fractions resources: {} - retry: - minAttempts: 1 n0-n0-n0: id: n0-n0-n0 inputBindings: @@ -192,8 +188,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' n0-n0-n1: id: n0-n0-n1 @@ -206,8 +200,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' n0-n1: id: n0-n1 @@ -220,8 +212,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' n0-n2: id: n0-n2 @@ -234,8 +224,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml index 68f4377264..24eadb558d 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml @@ -100,8 +100,6 @@ spec: kind: task name: coin_toss resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.coin_toss" ' n1: branch: @@ -125,8 +123,6 @@ spec: kind: branch name: double_or_square resources: {} - retry: - minAttempts: 1 n1-n0: id: n1-n0 inputBindings: @@ -138,8 +134,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.square" ' n1-n1: id: n1-n1 @@ -157,8 +151,6 @@ spec: kind: task name: calc_sum resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.calc_sum" ' n2: id: n2 @@ -171,8 +163,6 @@ spec: kind: task name: double resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.conditions.double" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml index c799c28ac9..c917877e0d 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml @@ -89,8 +89,6 @@ spec: kind: task name: count_characters resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.dynamics.count_characters" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml index 4c5a3f70d7..d5c29c9810 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml @@ -106,8 +106,6 @@ spec: kind: task name: coalesce resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.map_task.coalesce" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml index 62ca3c514a..d99831ef8e 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml @@ -110,8 +110,6 @@ spec: kind: branch name: terminal_case resources: {} - retry: - minAttempts: 1 n0-n0: id: n0-n0 inputBindings: @@ -123,8 +121,6 @@ spec: kind: task name: sort_locally resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.merge_sort.sort_locally" ' n0-n1: id: n0-n1 @@ -142,8 +138,6 @@ spec: kind: task name: merge_sort_remotely resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.merge_sort.merge_sort_remotely" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml index c50e0ed0b1..05c5b6f080 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml @@ -94,8 +94,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.subworkflows.t1" ' n1: id: n1 @@ -108,8 +106,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.subworkflows.t1" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml index 3c652334ad..44b6afc5fb 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml @@ -80,8 +80,6 @@ spec: kind: task name: count_freq_words resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.control_flow.subworkflows.count_freq_words" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml index 46106f72bd..d22d9eaf90 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml @@ -102,8 +102,6 @@ spec: kind: task name: print_file resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.extend_flyte.custom_task_plugin.print_file" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml index 61f35fdf31..56b6c88316 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml @@ -75,8 +75,6 @@ spec: kind: task name: generate resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.extend_flyte.custom_types.generate" ' n1: id: n1 @@ -89,8 +87,6 @@ spec: kind: task name: consume resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.extend_flyte.custom_types.consume" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml index 1f214840b1..2471c19aac 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml @@ -101,8 +101,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.basic_workflow.t1" ' n1: id: n1 @@ -120,8 +118,6 @@ spec: kind: task name: t2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.basic_workflow.t2" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml index 265d00d750..bb3d279c16 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml @@ -86,8 +86,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_tasks.t1" ' n1: id: n1 @@ -100,8 +98,6 @@ spec: kind: task name: t2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_tasks.t2" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml index 64c2d33bdc..690370bf14 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml @@ -98,8 +98,6 @@ spec: kind: task name: setup resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_workflows.setup" ' n1: id: n1 @@ -112,8 +110,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_workflows.t1" ' n2: id: n2 @@ -126,16 +122,12 @@ spec: kind: task name: t2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_workflows.t2" ' n3: id: n3 kind: task name: teardown resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.decorating_workflows.teardown" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml index bea325ce83..5c33c67395 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml @@ -90,8 +90,6 @@ spec: kind: task name: add_data resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.documented_workflow.add_data" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml index 5bb3618837..c1d2d357da 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml @@ -90,8 +90,6 @@ spec: kind: task name: add_data resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.documented_workflow.add_data" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml index d7cf5ac8b6..2c781af52e 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml @@ -90,8 +90,6 @@ spec: kind: task name: add_data resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.documented_workflow.add_data" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml index b165329444..326658e0c0 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml @@ -113,8 +113,6 @@ spec: kind: task name: normalize_columns resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.files.normalize_columns" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml index 791dc12b85..421c42d822 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml @@ -107,8 +107,6 @@ spec: kind: task name: download_files resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.folders.download_files" ' n1: id: n1 @@ -131,8 +129,6 @@ spec: kind: task name: normalize_all_files resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.folders.normalize_all_files" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml index 9cea7205bc..18b5cd57a7 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml @@ -69,8 +69,6 @@ spec: kind: task name: say_hello resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.hello_world.say_hello" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml index 7ef5b34aa2..9063363372 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml @@ -112,16 +112,12 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.imperative_wf_style.t1" ' n1: id: n1 kind: task name: t2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.imperative_wf_style.t2" ' n2: id: n2 @@ -139,8 +135,6 @@ spec: kind: task name: t3 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.imperative_wf_style.t3" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml index 4d461d672b..9f2e119810 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml @@ -80,8 +80,6 @@ spec: kind: task name: square resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.lp.square" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml index 0f6aa74dda..dfd0880492 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml @@ -98,8 +98,6 @@ spec: kind: task name: greet resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.lp.greet" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml index 86638b623c..753d9aa567 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml @@ -83,16 +83,12 @@ spec: kind: task name: say_hello resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.named_outputs.say_hello" ' n1: id: n1 kind: task name: say_hello resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.named_outputs.say_hello" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml index 3e8cbbce62..db69412129 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml @@ -93,8 +93,6 @@ spec: kind: task name: create_entities resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.shell_task.create_entities" ' n1: id: n1 @@ -107,8 +105,6 @@ spec: kind: task name: task_1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"task_1" ' n2: id: n2 @@ -126,8 +122,6 @@ spec: kind: task name: task_2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"task_2" ' n3: id: n3 @@ -150,8 +144,6 @@ spec: kind: task name: task_3 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"task_3" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml index 765b2dd800..4029e11de6 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml @@ -87,8 +87,6 @@ spec: kind: task name: uncached_data_reading_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.task_cache.uncached_data_reading_task" ' n1: id: n1 @@ -101,8 +99,6 @@ spec: kind: task name: cached_data_processing_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.task_cache.cached_data_processing_task" ' n2: id: n2 @@ -115,8 +111,6 @@ spec: kind: task name: cached_data_processing_task resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.task_cache.cached_data_processing_task" ' n3: id: n3 @@ -134,8 +128,6 @@ spec: kind: task name: compare_dataframes resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.flyte_basics.task_cache.compare_dataframes" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml index dfeda53b81..3a7094a8e3 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml @@ -74,8 +74,6 @@ spec: kind: task name: format_date resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.scheduled_workflows.lp_schedules.format_date" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml index 92c721784f..64e74df112 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml @@ -74,8 +74,6 @@ spec: kind: task name: be_positive resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.scheduled_workflows.lp_schedules.be_positive" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml index b114270796..300cf2aa39 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml @@ -116,8 +116,6 @@ spec: kind: task name: upload_result resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.custom_objects.upload_result" ' n1: id: n1 @@ -130,8 +128,6 @@ spec: kind: task name: download_result resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.custom_objects.download_result" ' n2: id: n2 @@ -144,8 +140,6 @@ spec: kind: task name: stringify resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.custom_objects.stringify" ' n3: id: n3 @@ -158,8 +152,6 @@ spec: kind: task name: stringify resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.custom_objects.stringify" ' n4: id: n4 @@ -177,8 +169,6 @@ spec: kind: task name: add resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.custom_objects.add" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml index 5eb063803b..176d7c573b 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml @@ -94,8 +94,6 @@ spec: kind: task name: enum_stringify resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.enums.enum_stringify" ' n1: id: n1 @@ -108,8 +106,6 @@ spec: kind: task name: string_to_enum resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.enums.string_to_enum" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml index 0bcc757aab..e43de40f8c 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml @@ -80,8 +80,6 @@ spec: kind: task name: greet resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.flyte_pickle.greet" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml index cf16bac0b9..82aae32866 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml @@ -86,8 +86,6 @@ spec: kind: task name: get_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.schema.get_df" ' n1: id: n1 @@ -100,8 +98,6 @@ spec: kind: task name: add_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.schema.add_df" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml index 12b2789192..eae182adf4 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml @@ -92,8 +92,6 @@ spec: kind: task name: get_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.get_df" ' n1: id: n1 @@ -106,8 +104,6 @@ spec: kind: task name: get_subset_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.get_subset_df" ' n2: id: n2 @@ -120,8 +116,6 @@ spec: kind: task name: to_numpy resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.to_numpy" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml index bde2f2892e..af5ad985af 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml @@ -92,8 +92,6 @@ spec: kind: task name: get_schema_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.get_schema_df" ' n1: id: n1 @@ -106,8 +104,6 @@ spec: kind: task name: get_subset_df resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.get_subset_df" ' n2: id: n2 @@ -120,8 +116,6 @@ spec: kind: task name: to_numpy resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.structured_dataset.to_numpy" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml index bec4de68e4..b935136947 100755 --- a/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml @@ -75,8 +75,6 @@ spec: kind: task name: t1 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.typed_schema.t1" ' n1: id: n1 @@ -89,8 +87,6 @@ spec: kind: task name: t2 resources: {} - retry: - minAttempts: 1 task: 'resource_type:TASK name:"core.type_system.typed_schema.t2" ' start-node: id: start-node diff --git a/flytepropeller/pkg/compiler/transformers/k8s/utils.go b/flytepropeller/pkg/compiler/transformers/k8s/utils.go index 7669fe6cd1..963eefa614 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/utils.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/utils.go @@ -19,13 +19,13 @@ func refStr(s string) *string { } func computeRetryStrategy(n *core.Node, t *core.TaskTemplate) *v1alpha1.RetryStrategy { - if n.GetMetadata() != nil && n.GetMetadata().GetRetries() != nil { + if n.GetMetadata() != nil && n.GetMetadata().GetRetries() != nil && n.GetMetadata().GetRetries().Retries != 0 { return &v1alpha1.RetryStrategy{ MinAttempts: refInt(int(n.GetMetadata().GetRetries().Retries + 1)), } } - if t != nil && t.GetMetadata() != nil && t.GetMetadata().GetRetries() != nil { + if t != nil && t.GetMetadata() != nil && t.GetMetadata().GetRetries() != nil && t.GetMetadata().GetRetries().Retries != 0 { return &v1alpha1.RetryStrategy{ MinAttempts: refInt(int(t.GetMetadata().GetRetries().Retries + 1)), } diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 06becf1dd1..f84f656155 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -94,7 +94,9 @@ var ( }, NodeConfig: NodeConfig{ MaxNodeRetriesOnSystemFailures: 3, - InterruptibleFailureThreshold: 1, + InterruptibleFailureThreshold: -1, + DefaultMaxAttempts: 1, + IgnoreRetryCause: false, }, MaxStreakLength: 8, // Turbo mode is enabled by default ProfilerPort: config.Port{ @@ -203,7 +205,9 @@ type WorkqueueConfig struct { type NodeConfig struct { DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"` MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"` - InterruptibleFailureThreshold int64 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible'"` + InterruptibleFailureThreshold int32 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'"` + DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` + IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` } // DefaultDeadlines contains default values for timeouts diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index 6b718c73a5..3e9f72a965 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -93,7 +93,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.node-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String(), "Default value of node timeout that includes the time spent queued.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.workflow-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String(), "Default value of workflow timeout that includes the time spent queued.") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.max-node-retries-system-failures"), defaultConfig.NodeConfig.MaxNodeRetriesOnSystemFailures, "Maximum number of retries per node for node failure due to infra issues") - cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible'") + cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible. Negative numbers are treated as complementary (ex. -1 means last attempt is non-interruptible).'") + cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "node-config.default-max-attempts"), defaultConfig.NodeConfig.DefaultMaxAttempts, "Default maximum number of attempts for a node") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "node-config.ignore-retry-cause"), defaultConfig.NodeConfig.IgnoreRetryCause, "Ignore retry cause and count all attempts toward a node's max attempts") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 41dc1256f7..50d448d09f 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -707,8 +707,36 @@ func TestConfig_SetFlags(t *testing.T) { testValue := "1" cmdFlags.Set("node-config.interruptible-failure-threshold", testValue) - if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.NodeConfig.InterruptibleFailureThreshold) + if vInt32, err := cmdFlags.GetInt32("node-config.interruptible-failure-threshold"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt32), &actual.NodeConfig.InterruptibleFailureThreshold) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_node-config.default-max-attempts", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.default-max-attempts", testValue) + if vInt32, err := cmdFlags.GetInt32("node-config.default-max-attempts"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt32), &actual.NodeConfig.DefaultMaxAttempts) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_node-config.ignore-retry-cause", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.ignore-retry-cause", testValue) + if vBool, err := cmdFlags.GetBool("node-config.ignore-retry-cause"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.NodeConfig.IgnoreRetryCause) } else { assert.FailNow(t, err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 2e4982927c..48735a9d2a 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -21,7 +21,6 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/errors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" - "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s" "github.com/flyteorg/flyte/flytestdlib/bitarray" @@ -213,7 +212,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } // initialize ArrayNode state - maxAttempts := task.DefaultMaxAttempts + maxAttempts := int(config.GetConfig().NodeConfig.DefaultMaxAttempts) subNodeSpec := *arrayNode.GetSubNodeSpec() if subNodeSpec.GetRetryStrategy() != nil && subNodeSpec.GetRetryStrategy().MinAttempts != nil { maxAttempts = *subNodeSpec.GetRetryStrategy().MinAttempts diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 69869934d0..3a03949c0c 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -484,7 +484,7 @@ type nodeExecutor struct { defaultExecutionDeadline time.Duration enqueueWorkflow v1alpha1.EnqueueWorkflow eventConfig *config.EventConfig - interruptibleFailureThreshold uint32 + interruptibleFailureThreshold int32 maxDatasetSizeBytes int64 maxNodeRetriesForSystemFailures uint32 metrics *nodeMetrics @@ -795,15 +795,20 @@ func isTimeoutExpired(queuedAt *metav1.Time, timeout time.Duration) bool { } func (c *nodeExecutor) isEligibleForRetry(nCtx interfaces.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, err *core.ExecutionError) (currentAttempt, maxAttempts uint32, isEligible bool) { - if err.Kind == core.ExecutionError_SYSTEM { - currentAttempt = nodeStatus.GetSystemFailures() - maxAttempts = c.maxNodeRetriesForSystemFailures - isEligible = currentAttempt < c.maxNodeRetriesForSystemFailures - return - } + if config.GetConfig().NodeConfig.IgnoreRetryCause { + currentAttempt = nodeStatus.GetAttempts() + 1 + } else { + if err.Kind == core.ExecutionError_SYSTEM { + currentAttempt = nodeStatus.GetSystemFailures() + maxAttempts = c.maxNodeRetriesForSystemFailures + isEligible = currentAttempt < c.maxNodeRetriesForSystemFailures + return + } - currentAttempt = (nodeStatus.GetAttempts() + 1) - nodeStatus.GetSystemFailures() - if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil { + currentAttempt = (nodeStatus.GetAttempts() + 1) - nodeStatus.GetSystemFailures() + } + maxAttempts = uint32(config.GetConfig().NodeConfig.DefaultMaxAttempts) + if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil && *nCtx.Node().GetRetryStrategy().MinAttempts != 1 { maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts) } isEligible = currentAttempt < maxAttempts @@ -1432,7 +1437,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, enqueueWorkflow: enQWorkflow, eventConfig: eventConfig, - interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold), + interruptibleFailureThreshold: nodeConfig.InterruptibleFailureThreshold, maxDatasetSizeBytes: maxDatasetSize, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures), metrics: metrics, diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 670f975716..0d67edcec5 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -2765,3 +2765,55 @@ func TestNodeExecutor_RecursiveNodeHandler_Cache(t *testing.T) { }) } } + +func TestNodeExecutor_IsEligibleForRetry(t *testing.T) { + tests := []struct { + name string + ignoreRetryCause bool + attempts uint32 + systemFailures uint32 + maxAttempts int32 + maxSystemFailures uint32 + errorKind core.ExecutionError_ErrorKind + expectedEligibility bool + }{ + {"EligibleUserRetries", false, 0, 0, 2, 0, core.ExecutionError_USER, true}, + {"IneligibleUserRetries", false, 1, 0, 2, 0, core.ExecutionError_USER, false}, + {"EligibleSystemRetries", false, 0, 0, 1, 1, core.ExecutionError_SYSTEM, true}, + {"IneligibleSystemRetries", false, 1, 1, 1, 1, core.ExecutionError_SYSTEM, false}, + {"IgnoreCauseEligibleUserRetries", true, 0, 0, 2, 0, core.ExecutionError_USER, true}, + {"IgnoreCauseIneligibleUserRetries", true, 1, 0, 2, 0, core.ExecutionError_USER, false}, + {"IgnoreCauseEligibleSystemRetries", true, 0, 0, 2, 0, core.ExecutionError_SYSTEM, true}, + {"IgnoreCauseIneligibleSystemRetries", true, 1, 1, 2, 0, core.ExecutionError_SYSTEM, false}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // mock all inputs + nodeExecutor := &nodeExecutor{ + maxNodeRetriesForSystemFailures: test.maxSystemFailures, + } + + config.GetConfig().NodeConfig.IgnoreRetryCause = test.ignoreRetryCause + config.GetConfig().NodeConfig.DefaultMaxAttempts = test.maxAttempts + + node := &mocks.ExecutableNode{} + node.OnGetRetryStrategy().Return(nil) + nCtx := &nodeExecContext{node: node} + + nodeStatus := &mocks.ExecutableNodeStatus{} + nodeStatus.OnGetAttempts().Return(test.attempts) + nodeStatus.OnGetSystemFailures().Return(test.systemFailures) + + err := &core.ExecutionError{ + Kind: test.errorKind, + } + + // validate eligibility + currentAttempt, maxAttempts, isEligible := nodeExecutor.isEligibleForRetry(nCtx, nodeStatus, err) + fmt.Println(currentAttempt, maxAttempts, isEligible) + assert.Equal(t, test.expectedEligibility, isEligible) + }) + } + +} diff --git a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go index 5f96520b55..c78bdfb557 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go @@ -55,7 +55,7 @@ type NodeExecutionMetadata_GetInterruptibleFailureThreshold struct { *mock.Call } -func (_m NodeExecutionMetadata_GetInterruptibleFailureThreshold) Return(_a0 uint32) *NodeExecutionMetadata_GetInterruptibleFailureThreshold { +func (_m NodeExecutionMetadata_GetInterruptibleFailureThreshold) Return(_a0 int32) *NodeExecutionMetadata_GetInterruptibleFailureThreshold { return &NodeExecutionMetadata_GetInterruptibleFailureThreshold{Call: _m.Call.Return(_a0)} } @@ -70,14 +70,14 @@ func (_m *NodeExecutionMetadata) OnGetInterruptibleFailureThresholdMatch(matcher } // GetInterruptibleFailureThreshold provides a mock function with given fields: -func (_m *NodeExecutionMetadata) GetInterruptibleFailureThreshold() uint32 { +func (_m *NodeExecutionMetadata) GetInterruptibleFailureThreshold() int32 { ret := _m.Called() - var r0 uint32 - if rf, ok := ret.Get(0).(func() uint32); ok { + var r0 int32 + if rf, ok := ret.Get(0).(func() int32); ok { r0 = rf() } else { - r0 = ret.Get(0).(uint32) + r0 = ret.Get(0).(int32) } return r0 diff --git a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go index dc7f3397bc..d22619a856 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go @@ -39,7 +39,7 @@ type NodeExecutionMetadata interface { GetK8sServiceAccount() string GetSecurityContext() core.SecurityContext IsInterruptible() bool - GetInterruptibleFailureThreshold() uint32 + GetInterruptibleFailureThreshold() int32 } type NodeExecutionContext interface { diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index 6a93678b75..96531a7e0a 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -94,7 +94,7 @@ type nodeExecMetadata struct { v1alpha1.Meta nodeExecID *core.NodeExecutionIdentifier interruptible bool - interruptibleFailureThreshold uint32 + interruptibleFailureThreshold int32 nodeLabels map[string]string } @@ -114,7 +114,7 @@ func (e nodeExecMetadata) IsInterruptible() bool { return e.interruptible } -func (e nodeExecMetadata) GetInterruptibleFailureThreshold() uint32 { +func (e nodeExecMetadata) GetInterruptibleFailureThreshold() int32 { return e.interruptibleFailureThreshold } @@ -208,7 +208,7 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { } func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, - node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold uint32, + node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold int32, maxDatasetSize int64, taskEventRecorder events.TaskEventRecorder, nodeEventRecorder events.NodeEventRecorder, tr interfaces.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { @@ -255,6 +255,14 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext } } +func isAboveInterruptibleFailureThreshold(numFailures uint32, maxAttempts uint32, interruptibleThreshold int32) bool { + if interruptibleThreshold > 0 { + return numFailures >= uint32(interruptibleThreshold) + } + + return numFailures >= maxAttempts-uint32(-interruptibleThreshold) +} + func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (interfaces.NodeExecutionContext, error) { n, ok := nl.GetNode(currentNodeID) @@ -286,10 +294,25 @@ func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionC s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - // a node is not considered interruptible if the system failures have exceeded the configured threshold - if interruptible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - interruptible = false - c.metrics.InterruptedThresholdHit.Inc(ctx) + if config.GetConfig().NodeConfig.IgnoreRetryCause { + // For the unified retry behavior we execute the last interruptibleFailureThreshold attempts on a non + // interruptible machine + maxAttempts := uint32(config.GetConfig().NodeConfig.DefaultMaxAttempts) + if n.GetRetryStrategy() != nil && n.GetRetryStrategy().MinAttempts != nil && *n.GetRetryStrategy().MinAttempts != 1 { + maxAttempts = uint32(*n.GetRetryStrategy().MinAttempts) + } + + // For interruptible nodes run at least one attempt on an interruptible machine (thus s.GetAttempts() > 0) even if there won't be any retries + if interruptible && s.GetAttempts() > 0 && isAboveInterruptibleFailureThreshold(s.GetAttempts(), maxAttempts, c.interruptibleFailureThreshold) { + interruptible = false + c.metrics.InterruptedThresholdHit.Inc(ctx) + } + } else { + // Else a node is not considered interruptible if the system failures have exceeded the configured threshold + if interruptible && isAboveInterruptibleFailureThreshold(s.GetSystemFailures(), c.maxNodeRetriesForSystemFailures+1, c.interruptibleFailureThreshold) { + interruptible = false + c.metrics.InterruptedThresholdHit.Inc(ctx) + } } rawOutputPrefix := c.defaultDataSandbox diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go index 8948fc7a24..d96942c632 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go @@ -375,3 +375,77 @@ func Test_NodeContext_RecordTaskEvent(t1 *testing.T) { }) } } + +func Test_NodeContext_IsInterruptible(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + ignoreRetryCause bool + attempts uint32 + systemFailures uint32 + maxAttempts int32 + maxSystemFailures uint32 + interruptibleFailureThreshold int32 + expectedInterruptible bool + }{ + {"Interruptible", false, 0, 0, 2, 1, 1, true}, + {"NonInterruptible", false, 0, 1, 2, 1, 1, false}, + {"InterruptibleNegativeThreshold", false, 0, 0, 2, 1, -1, true}, + {"InterruptibleNegativeThreshold2", false, 3, 3, 5, 4, -1, true}, + {"NonInterruptibleNegativeThreshold", false, 1, 1, 2, 1, -1, false}, + // maxSystemFailures should be ignored if ignoreRetryCause is true + {"IgnoreCauseInterruptible", true, 0, 0, 2, 999, 1, true}, + {"IgnoreCauseInterruptibleFirstTry", true, 0, 0, 1, 999, -1, true}, // First try should always be interruptible if interruptible is set + {"IgnoreCauseInterruptibleNegativeThreshold", true, 0, 0, 2, 999, -1, true}, + {"IgnoreCauseInterruptibleNegativeThreshold2", true, 2, 1, 4, 999, -1, true}, + {"IgnoreCauseNonInterruptibleSystem", true, 1, 1, 2, 999, 1, false}, + {"IgnoreCauseNonInterruptibleUser", true, 1, 0, 2, 999, 1, false}, + {"IgnoreCauseNonInterruptibleSystemNegativeThreshold", true, 3, 3, 4, 0, -1, false}, + {"IgnoreCauseNonInterruptibleUserNegativeThreshold", true, 3, 0, 4, 0, -1, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := promutils.NewTestScope() + + // mock all inputs + config.GetConfig().NodeConfig.DefaultMaxAttempts = tt.maxAttempts + config.GetConfig().NodeConfig.IgnoreRetryCause = tt.ignoreRetryCause + + dataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, scope.NewSubScope("dataStore")) + nodeExecutor := nodeExecutor{ + interruptibleFailureThreshold: tt.interruptibleFailureThreshold, + maxNodeRetriesForSystemFailures: tt.maxSystemFailures, + maxDatasetSizeBytes: 0, + defaultDataSandbox: "s3://bucket-a", + store: dataStore, + shardSelector: ioutils.NewConstantShardSelector([]string{"x"}), + enqueueWorkflow: func(workflowID v1alpha1.WorkflowID) {}, + metrics: &nodeMetrics{ + InterruptibleNodesRunning: labeled.NewCounter("running", "xyz", scope.NewSubScope("interruptible1")), + InterruptibleNodesTerminated: labeled.NewCounter("terminated", "xyz", scope.NewSubScope("interruptible2")), + InterruptedThresholdHit: labeled.NewCounter("thresholdHit", "xyz", scope.NewSubScope("interruptible3")), + }, + } + + w := getTestFlyteWorkflow() + + nodeLookup := &mocks2.NodeLookup{} + interruptible := true + nodeLookup.OnGetNode("node-a").Return(getTestNodeSpec(&interruptible), true) + nodeLookup.OnGetNodeExecutionStatus(ctx, "node-a").Return(&v1alpha1.NodeStatus{ + Attempts: tt.attempts, + SystemFailures: tt.systemFailures, + }) + + p := parentInfo{} + execContext := executors.NewExecutionContext(w, w, w, p, nil) + + // validate interruptible + nCtx, err := nodeExecutor.BuildNodeExecutionContext(context.Background(), execContext, nodeLookup, "node-a") + assert.NoError(t, err) + assert.Equal(t, tt.expectedInterruptible, nCtx.NodeExecutionMetadata().IsInterruptible()) + }) + } +} diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go index 45a981577f..0d9a226cf1 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go @@ -13,6 +13,7 @@ import ( "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + controllerconfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/errors" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" @@ -29,7 +30,6 @@ var ( ) const IDMaxLength = 50 -const DefaultMaxAttempts = 1 type taskExecutionID struct { execName string @@ -276,7 +276,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N } resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(plugin.GetID())) - maxAttempts := uint32(DefaultMaxAttempts) + maxAttempts := uint32(controllerconfig.GetConfig().NodeConfig.DefaultMaxAttempts) if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil { maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts) }