Skip to content

Commit

Permalink
feature; Capped retries for all failures and allowing backoff queue (f…
Browse files Browse the repository at this point in the history
…lyteorg#146)

* Capped retries for all failures and allowing backoff queue

* lint fix
  • Loading branch information
Ketan Umare authored Jun 6, 2020
1 parent 755687e commit 4aea719
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 82 deletions.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,9 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kubeflow/pytorch-operator v0.6.0 h1:y9Vzk7Jd5H/s610Y+ucURypCHgJugB25UL8GEz4DRL4=
github.com/kubeflow/pytorch-operator v0.6.0/go.mod h1:zHblV+yTwVG4PCgKTU2wPfOmQ6TJdfT87lDfHrP1a1Y=
github.com/kubeflow/tf-operator v0.5.3 h1:Ejn5vEAwHBKHU2sJTlUIRpezqIX3WeqXZ2dZx6zn6vY=
github.com/kubeflow/tf-operator v0.5.3/go.mod h1:EBtz5LQoKaHUl/5fV5vD1qXVNVNyn3TrFaH6eVoQ8SY=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand All @@ -393,6 +395,7 @@ github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0=
github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U=
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.23 h1:cN6d6f1ZkoHw+HD4wFCSVFVv+sCSeyx13E+hXIYEDzo=
github.com/lyft/flyteplugins v0.3.23/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
Expand Down
184 changes: 106 additions & 78 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/config"
"github.com/lyft/flytepropeller/pkg/controller/workflowstore"

Expand Down Expand Up @@ -47,6 +48,15 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
}
}

func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow {
// Let's mark these as system errors.
// We only want to increase failed attempts and discard any other partial changes to the CRD.
wfDeepCopy := w.DeepCopy()
wfDeepCopy.GetExecutionStatus().IncFailedAttempts()
wfDeepCopy.GetExecutionStatus().SetMessage(err.Error())
return wfDeepCopy
}

type Propeller struct {
wfStore workflowstore.FlyteWorkflow
workflowExecutor executors.Workflow
Expand All @@ -58,6 +68,68 @@ func (p *Propeller) Initialize(ctx context.Context) error {
return p.workflowExecutor.Initialize(ctx)
}

func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {

t := p.metrics.DeepCopyTime.Start()
mutableW := originalW.DeepCopy()
t.Stop()
ctx = contextutils.WithWorkflowID(ctx, mutableW.GetID())
if execID := mutableW.GetExecutionID(); execID.WorkflowExecutionIdentifier != nil {
ctx = contextutils.WithProjectDomain(ctx, mutableW.GetExecutionID().Project, mutableW.GetExecutionID().Domain)
}
ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion())

maxRetries := uint32(p.cfg.MaxWorkflowRetries)
if IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) {
var err error
func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleAbortedWorkflow(ctx, mutableW, maxRetries)
}()
if err != nil {
p.metrics.AbortError.Inc(ctx)
return nil, err
}
return mutableW, nil
}

if !mutableW.GetExecutionStatus().IsTerminated() {
var err error
SetFinalizerIfEmpty(mutableW, FinalizerKey)

func() {
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
defer func() {
t.Stop()
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when reconciling workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleFlyteWorkflow(ctx, mutableW)
}()

if err != nil {
logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]. Is nill [%v]",
err, reflect.TypeOf(err))
p.metrics.SystemError.Inc(ctx)
return nil, err
}
} else {
logger.Warn(ctx, "Workflow is marked as terminated but doesn't have the completed label, marking it as completed.")
}
return mutableW, nil
}

// reconciler compares the actual state with the desired, and attempts to
// converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource
// with the current status of the resource.
Expand Down Expand Up @@ -85,106 +157,62 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
defer logger.Infof(ctx, "Completed processing workflow.")

// Get the FlyteWorkflow resource with this namespace/name
w, err := p.wfStore.Get(ctx, namespace, name)
if err != nil {
if workflowstore.IsNotFound(err) {
w, fetchErr := p.wfStore.Get(ctx, namespace, name)
if fetchErr != nil {
if workflowstore.IsNotFound(fetchErr) {
p.metrics.WorkflowNotFound.Inc()
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] not found, may be deleted.", namespace, name)
return nil
}
if workflowstore.IsWorkflowStale(err) {
if workflowstore.IsWorkflowStale(fetchErr) {
p.metrics.RoundSkipped.Inc()
logger.Warningf(ctx, "Workflow namespace[%v]/name[%v] Stale.", namespace, name)
return nil
}
logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off", err)
return err
logger.Warningf(ctx, "Failed to GetWorkflow, retrying with back-off", fetchErr)
return fetchErr
}

t := p.metrics.DeepCopyTime.Start()
wfDeepCopy := w.DeepCopy()
t.Stop()
ctx = contextutils.WithWorkflowID(ctx, wfDeepCopy.GetID())
if execID := wfDeepCopy.GetExecutionID(); execID.WorkflowExecutionIdentifier != nil {
ctx = contextutils.WithProjectDomain(ctx, wfDeepCopy.GetExecutionID().Project, wfDeepCopy.GetExecutionID().Domain)
if w.GetExecutionStatus().IsTerminated() {
if HasCompletedLabel(w) && !HasFinalizer(w) {
logger.Debugf(ctx, "Workflow is terminated.")
// This workflow had previously completed, let us ignore it
return nil
}
}
ctx = contextutils.WithResourceVersion(ctx, wfDeepCopy.GetResourceVersion())

maxRetries := uint32(p.cfg.MaxWorkflowRetries)
if IsDeleted(wfDeepCopy) || (wfDeepCopy.Status.FailedAttempts > maxRetries) {
var err error
func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleAbortedWorkflow(ctx, wfDeepCopy, maxRetries)
}()
if err != nil {
p.metrics.AbortError.Inc(ctx)
return err
}
mutatedWf, err := p.TryMutateWorkflow(ctx, w)
if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
mutatedWf = RecordSystemError(w, err)
p.metrics.SystemError.Inc(ctx)
} else if mutatedWf == nil {
return nil
} else {
if wfDeepCopy.GetExecutionStatus().IsTerminated() {
if HasCompletedLabel(wfDeepCopy) && !HasFinalizer(wfDeepCopy) {
logger.Debugf(ctx, "Workflow is terminated.")
if !w.GetExecutionStatus().IsTerminated() {
// No updates in the status we detected, we will skip writing to KubeAPI
if mutatedWf.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
return nil
}
// NOTE: This should never really happen, but in case we externally mark the workflow as terminated
// We should allow cleanup
logger.Warn(ctx, "Workflow is marked as terminated but doesn't have the completed label, marking it as completed.")
} else {
SetFinalizerIfEmpty(wfDeepCopy, FinalizerKey)

func() {
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
defer func() {
t.Stop()
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when reconciling workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleFlyteWorkflow(ctx, wfDeepCopy)
}()

if err != nil {
logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]. Is nill [%v]",
err, reflect.TypeOf(err))

// Let's mark these as system errors.
// We only want to increase failed attempts and discard any other partial changes to the CRD.
wfDeepCopy = w.DeepCopy()
wfDeepCopy.GetExecutionStatus().IncFailedAttempts()
wfDeepCopy.GetExecutionStatus().SetMessage(err.Error())
p.metrics.SystemError.Inc(ctx)
} else {
// No updates in the status we detected, we will skip writing to KubeAPI
if wfDeepCopy.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
return nil
}
}
}
}
// If the end result is a terminated workflow, we remove the labels
if wfDeepCopy.GetExecutionStatus().IsTerminated() {
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(wfDeepCopy, time.Now())
ResetFinalizers(wfDeepCopy)
if mutatedWf.GetExecutionStatus().IsTerminated() {
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
}
}
// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
_, err = p.wfStore.Update(ctx, wfDeepCopy, workflowstore.PriorityClassCritical)
_, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
if updateErr != nil {
return updateErr
}
return err
}

Expand Down
61 changes: 57 additions & 4 deletions pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -51,6 +52,31 @@ func TestPropeller_Handle(t *testing.T) {
assert.NoError(t, p.Handle(ctx, namespace, name))
})

t.Run("terminated-and-finalized", func(t *testing.T) {
w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
Status: v1alpha1.WorkflowStatus{
Phase: v1alpha1.WorkflowPhaseFailed,
},
}
SetCompletedLabel(w, time.Now())
assert.NoError(t, s.Create(ctx, w))
assert.NoError(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 0, len(r.Finalizers))
assert.True(t, HasCompletedLabel(r))
assert.Equal(t, uint32(0), r.Status.FailedAttempts)
})

t.Run("terminated", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Expand All @@ -71,6 +97,7 @@ func TestPropeller_Handle(t *testing.T) {
assert.Equal(t, v1alpha1.WorkflowPhaseFailed, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 0, len(r.Finalizers))
assert.True(t, HasCompletedLabel(r))
assert.Equal(t, uint32(0), r.Status.FailedAttempts)
})

t.Run("happy", func(t *testing.T) {
Expand All @@ -94,6 +121,7 @@ func TestPropeller_Handle(t *testing.T) {
assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 1, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
assert.Equal(t, uint32(0), r.Status.FailedAttempts)
})

t.Run("error", func(t *testing.T) {
Expand All @@ -109,7 +137,7 @@ func TestPropeller_Handle(t *testing.T) {
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
return fmt.Errorf("failed")
}
assert.NoError(t, p.Handle(ctx, namespace, name))
assert.Error(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
Expand Down Expand Up @@ -147,6 +175,32 @@ func TestPropeller_Handle(t *testing.T) {
assert.Equal(t, uint32(1), r.Status.FailedAttempts)
})

t.Run("abort-error", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
Phase: v1alpha1.WorkflowPhaseRunning,
},
}))
exec.HandleAbortedCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow, maxRetries uint32) error {
return fmt.Errorf("abort error")
}
assert.Error(t, p.Handle(ctx, namespace, name))
r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseRunning, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 0, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
assert.Equal(t, uint32(2), r.Status.FailedAttempts)
})

t.Run("abort_panics", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Expand All @@ -168,12 +222,11 @@ func TestPropeller_Handle(t *testing.T) {
assert.Error(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)

assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseRunning, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 1, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
assert.Equal(t, uint32(1), r.Status.FailedAttempts)
assert.Equal(t, uint32(2), r.Status.FailedAttempts)
})

t.Run("noUpdate", func(t *testing.T) {
Expand Down Expand Up @@ -220,7 +273,7 @@ func TestPropeller_Handle(t *testing.T) {
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
panic("error")
}
assert.NoError(t, p.Handle(ctx, namespace, name))
assert.Error(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha
// Best effort clean-up.
if err2 := c.cleanupRunningNodes(ctx, w, reason); err2 != nil {
logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err2)
return err2
}

var status Status
Expand Down

0 comments on commit 4aea719

Please sign in to comment.