From 70d9c6f890003618d40740e3151305c67b70ca7b Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Tue, 23 Jan 2024 12:45:53 -0800 Subject: [PATCH] [BUG] Handle Potential Indefinite Propeller Update Loops (#4755) * keep terminal phase on retry if already in terminal phase and updating CRD fails due to ErrWorkflowToLarge Signed-off-by: Paul Dittamo * update error message Signed-off-by: Paul Dittamo * ensure no finalizers set on retrying terminal wf update Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/handler.go | 23 ++++++++++---- flytepropeller/pkg/controller/handler_test.go | 30 ++++++++++++++++++- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index cc6b18e819..94e8ab6c12 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -385,11 +385,24 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo // Workflow is too large, we will mark the workflow as failing and record it. This will automatically // propagate the failure in the next round. mutableW := w.DeepCopy() - mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ - Kind: core.ExecutionError_SYSTEM, - Code: "WorkflowTooLarge", - Message: "Workflow execution state is too large for Flyte to handle.", - }) + // catch potential indefinite update loop + if mutatedWf.GetExecutionStatus().IsTerminated() { + ResetFinalizers(mutableW) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) + SetCompletedLabel(mutableW, time.Now()) + msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase()) + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, msg, &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } else { + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + } if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) return nil, e diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index ce1ca63818..3469c1da80 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -815,8 +815,36 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { } s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() - s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailing + }), mock.Anything).Return(nil, nil).Once() + err := p.Handle(ctx, namespace, name) + assert.NoError(t, err) + }) + t.Run("too-large-terminal", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "done", nil) + return nil + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() + s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool { + return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !HasFinalizer(w) && HasCompletedLabel(w) + }), mock.Anything).Return(nil, nil).Once() err := p.Handle(ctx, namespace, name) assert.NoError(t, err) })