From 54f82e20dfb207fcb6a7b7f1d6f24dd2a2ecbae6 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 20 Dec 2023 09:54:30 -0600 Subject: [PATCH 1/5] correctly computing the maximum number of attempts and system failures Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index f1e2ef64fc..31e6e4937f 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -205,10 +205,16 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } // initialize ArrayNode state - maxAttempts := int(config.GetConfig().NodeConfig.DefaultMaxAttempts) - subNodeSpec := *arrayNode.GetSubNodeSpec() - if subNodeSpec.GetRetryStrategy() != nil && subNodeSpec.GetRetryStrategy().MinAttempts != nil { - maxAttempts = *subNodeSpec.GetRetryStrategy().MinAttempts + maxSystemFailuresValue := int(config.GetConfig().NodeConfig.MaxNodeRetriesOnSystemFailures) + maxAttemptsValue := int(config.GetConfig().NodeConfig.DefaultMaxAttempts) + if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil && *nCtx.Node().GetRetryStrategy().MinAttempts != 1 { + maxAttemptsValue = int(*nCtx.Node().GetRetryStrategy().MinAttempts) + } + + if config.GetConfig().NodeConfig.IgnoreRetryCause { + maxSystemFailuresValue = maxAttemptsValue + } else { + maxAttemptsValue += maxSystemFailuresValue } for _, item := range []struct { @@ -219,8 +225,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // defined as an `iota` so it is impossible to programmatically get largest value {arrayReference: &arrayNodeState.SubNodePhases, maxValue: int(v1alpha1.NodePhaseRecovered)}, {arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1}, - {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttempts}, - {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxAttempts}, + {arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue}, + {arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue}, } { *item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) @@ -342,6 +348,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { incrementTaskPhaseVersion = true } + + fmt.Printf("HAMERSAW - subnode:%d attempts:%d systemFailures:%d\n", index, subNodeStatus.GetAttempts(), subNodeStatus.GetSystemFailures()) } // if any workers failed then return the error From 47568dc99af7f29db6d85e1eb468078bac80fceb Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 20 Dec 2023 11:18:53 -0600 Subject: [PATCH 2/5] added unit tests Signed-off-by: Daniel Rammer --- .../controller/nodes/array/handler_test.go | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index b2e85c0979..39f0c1e196 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -654,6 +654,127 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } } +func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { + ctx := context.Background() + + inputValues := map[string][]int64{ + "foo": []int64{1}, + "bar": []int64{2}, + } + literalMap := convertMapToArrayLiterals(inputValues) + + // HAMERSAW + tests := []struct{ + name string + defaultMaxAttempts int32 + maxSystemFailures int64 + ignoreRetryCause bool + transition handler.Transition + expectedAttempts int + }{ + { + name: "UserFailure", + defaultMaxAttempts: 3, + maxSystemFailures: 10, + ignoreRetryCause: false, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_USER, "", "", &handler.ExecutionInfo{})), + expectedAttempts: 3, + }, + { + name: "SystemFailure", + defaultMaxAttempts: 3, + maxSystemFailures: 10, + ignoreRetryCause: false, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), + expectedAttempts: 11, + }, + { + name: "UserFailureIgnoreRetryCause", + defaultMaxAttempts: 3, + maxSystemFailures: 10, + ignoreRetryCause: true, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_USER, "", "", &handler.ExecutionInfo{})), + expectedAttempts: 3, + }, + { + name: "SystemFailureIgnoreRetryCause", + defaultMaxAttempts: 3, + maxSystemFailures: 10, + ignoreRetryCause: true, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), + expectedAttempts: 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + config.GetConfig().NodeConfig.DefaultMaxAttempts = test.defaultMaxAttempts + config.GetConfig().NodeConfig.MaxNodeRetriesOnSystemFailures = test.maxSystemFailures + config.GetConfig().NodeConfig.IgnoreRetryCause = test.ignoreRetryCause + + // create NodeExecutionContext + scope := promutils.NewTestScope() + dataStore, err := storage.NewDataStore(&storage.Config{ + Type: storage.TypeMemory, + }, scope) + eventRecorder := newBufferedEventRecorder() + arrayNodeState := &handler.ArrayNodeState{ + Phase: v1alpha1.ArrayNodePhaseNone, + } + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + + // initialize ArrayNodeHandler + nodeHandler := &mocks.NodeHandler{} + nodeHandler.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + nodeHandler.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil) + nodeHandler.OnFinalizeRequired().Return(false) + nodeHandler.OnHandleMatch(mock.Anything, mock.Anything).Return(test.transition, nil) + + arrayNodeHandler, err := createArrayNodeHandler(ctx, t, nodeHandler, dataStore, scope) + assert.NoError(t, err) + + // evaluate node to transition to Executing + _, err = arrayNodeHandler.Handle(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.ArrayNodePhaseExecuting, arrayNodeState.Phase) + + for i := 0; i < len(arrayNodeState.SubNodePhases.GetItems()); i++ { + arrayNodeState.SubNodePhases.SetItem(i, bitarray.Item(v1alpha1.NodePhaseRunning)) + } + + for i := 0; i < len(arrayNodeState.SubNodeTaskPhases.GetItems()); i++ { + arrayNodeState.SubNodeTaskPhases.SetItem(i, bitarray.Item(core.PhaseRunning)) + } + + // evaluate node until failure + attempts := 1 + for { + nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState) + _, err = arrayNodeHandler.Handle(ctx, nCtx) + assert.NoError(t, err) + + if arrayNodeState.Phase == v1alpha1.ArrayNodePhaseFailing { + break; + } + + // failing a task requires two calls to Handle, the first to return a + // RetryableFailure and the second to abort. therefore, we only increment the + // number of attempts once in this loop. + if arrayNodeState.SubNodePhases.GetItem(0) == bitarray.Item(v1alpha1.NodePhaseRetryableFailure) { + attempts++ + } + } + + assert.Equal(t, test.expectedAttempts, attempts) + }) + } +} + + func TestHandleArrayNodePhaseSucceeding(t *testing.T) { ctx := context.Background() scope := promutils.NewTestScope() From 34f73ce5ba240be204b605550d41b437e205961a Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 20 Dec 2023 11:20:27 -0600 Subject: [PATCH 3/5] fixed lint Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/handler.go | 2 +- .../controller/nodes/array/handler_test.go | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 31e6e4937f..c24eb27a6d 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -208,7 +208,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu maxSystemFailuresValue := int(config.GetConfig().NodeConfig.MaxNodeRetriesOnSystemFailures) maxAttemptsValue := int(config.GetConfig().NodeConfig.DefaultMaxAttempts) if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil && *nCtx.Node().GetRetryStrategy().MinAttempts != 1 { - maxAttemptsValue = int(*nCtx.Node().GetRetryStrategy().MinAttempts) + maxAttemptsValue = *nCtx.Node().GetRetryStrategy().MinAttempts } if config.GetConfig().NodeConfig.IgnoreRetryCause { diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 39f0c1e196..168d545063 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -655,7 +655,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { } func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { - ctx := context.Background() + ctx := context.Background() inputValues := map[string][]int64{ "foo": []int64{1}, @@ -664,7 +664,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { literalMap := convertMapToArrayLiterals(inputValues) // HAMERSAW - tests := []struct{ + tests := []struct { name string defaultMaxAttempts int32 maxSystemFailures int64 @@ -677,36 +677,36 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { defaultMaxAttempts: 3, maxSystemFailures: 10, ignoreRetryCause: false, - transition: handler.DoTransition(handler.TransitionTypeEphemeral, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_USER, "", "", &handler.ExecutionInfo{})), - expectedAttempts: 3, + expectedAttempts: 3, }, { name: "SystemFailure", defaultMaxAttempts: 3, maxSystemFailures: 10, ignoreRetryCause: false, - transition: handler.DoTransition(handler.TransitionTypeEphemeral, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), - expectedAttempts: 11, + expectedAttempts: 11, }, { name: "UserFailureIgnoreRetryCause", defaultMaxAttempts: 3, maxSystemFailures: 10, ignoreRetryCause: true, - transition: handler.DoTransition(handler.TransitionTypeEphemeral, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_USER, "", "", &handler.ExecutionInfo{})), - expectedAttempts: 3, + expectedAttempts: 3, }, { name: "SystemFailureIgnoreRetryCause", defaultMaxAttempts: 3, maxSystemFailures: 10, ignoreRetryCause: true, - transition: handler.DoTransition(handler.TransitionTypeEphemeral, + transition: handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})), - expectedAttempts: 3, + expectedAttempts: 3, }, } @@ -721,6 +721,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { dataStore, err := storage.NewDataStore(&storage.Config{ Type: storage.TypeMemory, }, scope) + assert.NoError(t, err) eventRecorder := newBufferedEventRecorder() arrayNodeState := &handler.ArrayNodeState{ Phase: v1alpha1.ArrayNodePhaseNone, @@ -758,7 +759,7 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { assert.NoError(t, err) if arrayNodeState.Phase == v1alpha1.ArrayNodePhaseFailing { - break; + break } // failing a task requires two calls to Handle, the first to return a @@ -774,7 +775,6 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { } } - func TestHandleArrayNodePhaseSucceeding(t *testing.T) { ctx := context.Background() scope := promutils.NewTestScope() From 30e902cdac3363a1dac710a8d4c0d16897ac8dc4 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 20 Dec 2023 11:21:14 -0600 Subject: [PATCH 4/5] removed debugging prints Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index c24eb27a6d..7dcdef4749 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -348,8 +348,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase { incrementTaskPhaseVersion = true } - - fmt.Printf("HAMERSAW - subnode:%d attempts:%d systemFailures:%d\n", index, subNodeStatus.GetAttempts(), subNodeStatus.GetSystemFailures()) } // if any workers failed then return the error From 80f1ab76f258f7182dfb66320e923e8009f623db Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 20 Dec 2023 11:21:56 -0600 Subject: [PATCH 5/5] and again Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 168d545063..fbb5ae875c 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -663,7 +663,6 @@ func TestHandleArrayNodePhaseExecutingSubNodeFailures(t *testing.T) { } literalMap := convertMapToArrayLiterals(inputValues) - // HAMERSAW tests := []struct { name string defaultMaxAttempts int32