From 4aea7199839c63fdd8ae84cbc707121c9f5b18fa Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Sat, 6 Jun 2020 10:56:03 -0700 Subject: [PATCH] feature; Capped retries for all failures and allowing backoff queue (#146) * Capped retries for all failures and allowing backoff queue * lint fix --- go.sum | 3 + pkg/controller/handler.go | 184 ++++++++++++++++------------ pkg/controller/handler_test.go | 61 ++++++++- pkg/controller/workflow/executor.go | 1 + 4 files changed, 167 insertions(+), 82 deletions(-) diff --git a/go.sum b/go.sum index 9facbe451c..e5036c9cb9 100644 --- a/go.sum +++ b/go.sum @@ -378,7 +378,9 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kubeflow/pytorch-operator v0.6.0 h1:y9Vzk7Jd5H/s610Y+ucURypCHgJugB25UL8GEz4DRL4= github.com/kubeflow/pytorch-operator v0.6.0/go.mod h1:zHblV+yTwVG4PCgKTU2wPfOmQ6TJdfT87lDfHrP1a1Y= +github.com/kubeflow/tf-operator v0.5.3 h1:Ejn5vEAwHBKHU2sJTlUIRpezqIX3WeqXZ2dZx6zn6vY= github.com/kubeflow/tf-operator v0.5.3/go.mod h1:EBtz5LQoKaHUl/5fV5vD1qXVNVNyn3TrFaH6eVoQ8SY= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -393,6 +395,7 @@ github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0= github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U= github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo= github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 1ca1728906..0800029fee 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -10,6 +10,7 @@ import ( "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/config" "github.com/lyft/flytepropeller/pkg/controller/workflowstore" @@ -47,6 +48,15 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { } } +func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow { + // Let's mark these as system errors. + // We only want to increase failed attempts and discard any other partial changes to the CRD. + wfDeepCopy := w.DeepCopy() + wfDeepCopy.GetExecutionStatus().IncFailedAttempts() + wfDeepCopy.GetExecutionStatus().SetMessage(err.Error()) + return wfDeepCopy +} + type Propeller struct { wfStore workflowstore.FlyteWorkflow workflowExecutor executors.Workflow @@ -58,6 +68,68 @@ func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } +func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { + + t := p.metrics.DeepCopyTime.Start() + mutableW := originalW.DeepCopy() + t.Stop() + ctx = contextutils.WithWorkflowID(ctx, mutableW.GetID()) + if execID := mutableW.GetExecutionID(); execID.WorkflowExecutionIdentifier != nil { + ctx = contextutils.WithProjectDomain(ctx, mutableW.GetExecutionID().Project, mutableW.GetExecutionID().Domain) + } + ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion()) + + maxRetries := uint32(p.cfg.MaxWorkflowRetries) + if IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) { + var err error + func() { + defer func() { + if r := recover(); r != nil { + stack := debug.Stack() + err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack)) + logger.Errorf(ctx, err.Error()) + p.metrics.PanicObserved.Inc(ctx) + } + }() + err = p.workflowExecutor.HandleAbortedWorkflow(ctx, mutableW, maxRetries) + }() + if err != nil { + p.metrics.AbortError.Inc(ctx) + return nil, err + } + return mutableW, nil + } + + if !mutableW.GetExecutionStatus().IsTerminated() { + var err error + SetFinalizerIfEmpty(mutableW, FinalizerKey) + + func() { + t := p.metrics.RawWorkflowTraversalTime.Start(ctx) + defer func() { + t.Stop() + if r := recover(); r != nil { + stack := debug.Stack() + err = fmt.Errorf("panic when reconciling workflow, Stack: [%s]", string(stack)) + logger.Errorf(ctx, err.Error()) + p.metrics.PanicObserved.Inc(ctx) + } + }() + err = p.workflowExecutor.HandleFlyteWorkflow(ctx, mutableW) + }() + + if err != nil { + logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]. Is nill [%v]", + err, reflect.TypeOf(err)) + p.metrics.SystemError.Inc(ctx) + return nil, err + } + } else { + logger.Warn(ctx, "Workflow is marked as terminated but doesn't have the completed label, marking it as completed.") + } + return mutableW, nil +} + // reconciler compares the actual state with the desired, and attempts to // converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource // with the current status of the resource. @@ -85,106 +157,62 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { defer logger.Infof(ctx, "Completed processing workflow.") // Get the FlyteWorkflow resource with this namespace/name - w, err := p.wfStore.Get(ctx, namespace, name) - if err != nil { - if workflowstore.IsNotFound(err) { + w, fetchErr := p.wfStore.Get(ctx, namespace, name) + if fetchErr != nil { + if workflowstore.IsNotFound(fetchErr) { p.metrics.WorkflowNotFound.Inc() logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] not found, may be deleted.", namespace, name) return nil } - if workflowstore.IsWorkflowStale(err) { + if workflowstore.IsWorkflowStale(fetchErr) { p.metrics.RoundSkipped.Inc() logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] Stale.", namespace, name) return nil } - logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off", err) - return err + logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off", fetchErr) + return fetchErr } - t := p.metrics.DeepCopyTime.Start() - wfDeepCopy := w.DeepCopy() - t.Stop() - ctx = contextutils.WithWorkflowID(ctx, wfDeepCopy.GetID()) - if execID := wfDeepCopy.GetExecutionID(); execID.WorkflowExecutionIdentifier != nil { - ctx = contextutils.WithProjectDomain(ctx, wfDeepCopy.GetExecutionID().Project, wfDeepCopy.GetExecutionID().Domain) + if w.GetExecutionStatus().IsTerminated() { + if HasCompletedLabel(w) && !HasFinalizer(w) { + logger.Debugf(ctx, "Workflow is terminated.") + // This workflow had previously completed, let us ignore it + return nil + } } - ctx = contextutils.WithResourceVersion(ctx, wfDeepCopy.GetResourceVersion()) - maxRetries := uint32(p.cfg.MaxWorkflowRetries) - if IsDeleted(wfDeepCopy) || (wfDeepCopy.Status.FailedAttempts > maxRetries) { - var err error - func() { - defer func() { - if r := recover(); r != nil { - stack := debug.Stack() - err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack)) - logger.Errorf(ctx, err.Error()) - p.metrics.PanicObserved.Inc(ctx) - } - }() - err = p.workflowExecutor.HandleAbortedWorkflow(ctx, wfDeepCopy, maxRetries) - }() - if err != nil { - p.metrics.AbortError.Inc(ctx) - return err - } + mutatedWf, err := p.TryMutateWorkflow(ctx, w) + if err != nil { + // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations + // We only want to increase failed attempts and discard any other partial changes to the CRD. + mutatedWf = RecordSystemError(w, err) + p.metrics.SystemError.Inc(ctx) + } else if mutatedWf == nil { + return nil } else { - if wfDeepCopy.GetExecutionStatus().IsTerminated() { - if HasCompletedLabel(wfDeepCopy) && !HasFinalizer(wfDeepCopy) { - logger.Debugf(ctx, "Workflow is terminated.") + if !w.GetExecutionStatus().IsTerminated() { + // No updates in the status we detected, we will skip writing to KubeAPI + if mutatedWf.Status.Equals(&w.Status) { + logger.Info(ctx, "WF hasn't been updated in this round.") return nil } - // NOTE: This should never really happen, but in case we externally mark the workflow as terminated - // We should allow cleanup - logger.Warn(ctx, "Workflow is marked as terminated but doesn't have the completed label, marking it as completed.") - } else { - SetFinalizerIfEmpty(wfDeepCopy, FinalizerKey) - - func() { - t := p.metrics.RawWorkflowTraversalTime.Start(ctx) - defer func() { - t.Stop() - if r := recover(); r != nil { - stack := debug.Stack() - err = fmt.Errorf("panic when reconciling workflow, Stack: [%s]", string(stack)) - logger.Errorf(ctx, err.Error()) - p.metrics.PanicObserved.Inc(ctx) - } - }() - err = p.workflowExecutor.HandleFlyteWorkflow(ctx, wfDeepCopy) - }() - - if err != nil { - logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]. Is nill [%v]", - err, reflect.TypeOf(err)) - - // Let's mark these as system errors. - // We only want to increase failed attempts and discard any other partial changes to the CRD. - wfDeepCopy = w.DeepCopy() - wfDeepCopy.GetExecutionStatus().IncFailedAttempts() - wfDeepCopy.GetExecutionStatus().SetMessage(err.Error()) - p.metrics.SystemError.Inc(ctx) - } else { - // No updates in the status we detected, we will skip writing to KubeAPI - if wfDeepCopy.Status.Equals(&w.Status) { - logger.Info(ctx, "WF hasn't been updated in this round.") - return nil - } - } } - } - // If the end result is a terminated workflow, we remove the labels - if wfDeepCopy.GetExecutionStatus().IsTerminated() { - // We add a completed label so that we can avoid polling for this workflow - SetCompletedLabel(wfDeepCopy, time.Now()) - ResetFinalizers(wfDeepCopy) + if mutatedWf.GetExecutionStatus().IsTerminated() { + // If the end result is a terminated workflow, we remove the labels + // We add a completed label so that we can avoid polling for this workflow + SetCompletedLabel(mutatedWf, time.Now()) + ResetFinalizers(mutatedWf) + } } // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not // allow changes to the Spec of the resource, which is ideal for ensuring // nothing other than resource status has been updated. - _, err = p.wfStore.Update(ctx, wfDeepCopy, workflowstore.PriorityClassCritical) + _, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) + if updateErr != nil { + return updateErr + } return err } diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index 0a15e27cc6..2c93a5c22a 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +52,31 @@ func TestPropeller_Handle(t *testing.T) { assert.NoError(t, p.Handle(ctx, namespace, name)) }) + t.Run("terminated-and-finalized", func(t *testing.T) { + w := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + Phase: v1alpha1.WorkflowPhaseFailed, + }, + } + SetCompletedLabel(w, time.Now()) + assert.NoError(t, s.Create(ctx, w)) + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.True(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) + }) + t.Run("terminated", func(t *testing.T) { assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ @@ -71,6 +97,7 @@ func TestPropeller_Handle(t *testing.T) { assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase()) assert.Equal(t, 0, len(r.Finalizers)) assert.True(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) }) t.Run("happy", func(t *testing.T) { @@ -94,6 +121,7 @@ func TestPropeller_Handle(t *testing.T) { assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) assert.Equal(t, 1, len(r.Finalizers)) assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) }) t.Run("error", func(t *testing.T) { @@ -109,7 +137,7 @@ func TestPropeller_Handle(t *testing.T) { exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { return fmt.Errorf("failed") } - assert.NoError(t, p.Handle(ctx, namespace, name)) + assert.Error(t, p.Handle(ctx, namespace, name)) r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) @@ -147,6 +175,32 @@ func TestPropeller_Handle(t *testing.T) { assert.Equal(t, uint32(1), r.Status.FailedAttempts) }) + t.Run("abort-error", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + Phase: v1alpha1.WorkflowPhaseRunning, + }, + })) + exec.HandleAbortedCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow, maxRetries uint32) error { + return fmt.Errorf("abort error") + } + assert.Error(t, p.Handle(ctx, namespace, name)) + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseRunning, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(2), r.Status.FailedAttempts) + }) + t.Run("abort_panics", func(t *testing.T) { assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ @@ -168,12 +222,11 @@ func TestPropeller_Handle(t *testing.T) { assert.Error(t, p.Handle(ctx, namespace, name)) r, err := s.Get(ctx, namespace, name) - assert.NoError(t, err) assert.Equal(t, v1alpha1.WorkflowPhaseRunning, r.GetExecutionStatus().GetPhase()) assert.Equal(t, 1, len(r.Finalizers)) assert.False(t, HasCompletedLabel(r)) - assert.Equal(t, uint32(1), r.Status.FailedAttempts) + assert.Equal(t, uint32(2), r.Status.FailedAttempts) }) t.Run("noUpdate", func(t *testing.T) { @@ -220,7 +273,7 @@ func TestPropeller_Handle(t *testing.T) { exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { panic("error") } - assert.NoError(t, p.Handle(ctx, namespace, name)) + assert.Error(t, p.Handle(ctx, namespace, name)) r, err := s.Get(ctx, namespace, name) assert.NoError(t, err) diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 1ae102c80b..06abfee7f4 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -428,6 +428,7 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha // Best effort clean-up. if err2 := c.cleanupRunningNodes(ctx, w, reason); err2 != nil { logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err2) + return err2 } var status Status