Skip to content

Commit

Permalink
Large workflows will not cause workflows to stay in running state #mi…
Browse files Browse the repository at this point in the history
…nor (flyteorg#240)

* Large workflows will not cause workflows to stay in running state

- Workflows will be moved to Failing state
- Incase of errors, the whole thing will be retried - system error
number of times

Signed-off-by: Ketan Umare <[email protected]>

* Updated docs

Signed-off-by: Ketan Umare <[email protected]>

* addressed comments

Signed-off-by: Ketan Umare <[email protected]>

* Better docs

Signed-off-by: Ketan Umare <[email protected]>

* Linter fix

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Mar 27, 2021
1 parent 8441d8b commit 380ded7
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation.
// It is then entrypoint into the K8s based Flyte controller.
package controller

import (
Expand Down
29 changes: 28 additions & 1 deletion pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"runtime/debug"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

Expand Down Expand Up @@ -52,6 +54,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
}
}

// Helper method to record system error in the workflow.
func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow {
// Let's mark these as system errors.
// We only want to increase failed attempts and discard any other partial changes to the CRD.
Expand All @@ -61,17 +64,21 @@ func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWork
return wfDeepCopy
}

// Core Propeller structure that houses the Reconciliation loop for Flytepropeller
type Propeller struct {
wfStore workflowstore.FlyteWorkflow
workflowExecutor executors.Workflow
metrics *propellerMetrics
cfg *config.Config
}

// Initializes all downstream executors
func (p *Propeller) Initialize(ctx context.Context) error {
return p.workflowExecutor.Initialize(ctx)
}

// TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state.
// The desired state here is the entire workflow is completed, actual state is each nodes current execution state.
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {

t := p.metrics.DeepCopyTime.Start()
Expand Down Expand Up @@ -134,7 +141,8 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
return mutableW, nil
}

// reconciler compares the actual state with the desired, and attempts to
// Handle method is the entry point for the reconciler.
// It compares the actual state with the desired, and attempts to
// converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource
// with the current status of the resource.
// Every FlyteWorkflow transitions through the following
Expand Down Expand Up @@ -227,6 +235,24 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
if updateErr != nil {
t.Stop()
// The update has failed, lets check if this is because the size is too large. If so
if workflowstore.IsWorkflowTooLarge(updateErr) {
logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr)
p.metrics.SystemError.Inc(ctx)
// Workflow is too large, we will mark the workflow as failing and record it. This will automatically
// propagate the failure in the next round.
mutableW := w.DeepCopy()
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "WorkflowTooLarge",
Message: "Workflow execution state is too large for Flyte to handle.",
})
if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil {
logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e)
return e
}
return nil
}
return updateErr
}
if err != nil {
Expand All @@ -249,6 +275,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
return nil
}

// NewPropellerHandler creates a new Propeller and initializes metrics
func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {

metrics := newPropellerMetrics(scope)
Expand Down
92 changes: 91 additions & 1 deletion pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"testing"
"time"

"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -52,6 +56,15 @@ func TestPropeller_Handle(t *testing.T) {
assert.NoError(t, p.Handle(ctx, namespace, name))
})

t.Run("stale", func(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrStaleWorkflowError, "stale")).Once()
assert.NoError(t, p.Handle(ctx, namespace, name))
})

t.Run("terminated-and-finalized", func(t *testing.T) {
w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -450,7 +463,6 @@ func TestPropeller_Handle(t *testing.T) {
}

func TestPropeller_Handle_TurboMode(t *testing.T) {
// TODO unit tests need to fixed
scope := promutils.NewTestScope()
ctx := context.TODO()
s := workflowstore.NewInMemoryWorkflowStore()
Expand Down Expand Up @@ -669,3 +681,81 @@ func TestPropellerHandler_Initialize(t *testing.T) {

assert.NoError(t, p.Initialize(ctx))
}

func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
ctx := context.TODO()
cfg := &config.Config{
MaxWorkflowRetries: 0,
}

const namespace = "test"
const name = "123"

t.Run("unknown error", func(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
}
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("unknown error")).Once()

err := p.Handle(ctx, namespace, name)
assert.Error(t, err)
})

t.Run("too-large-fail-repeat", func(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
}
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Twice()

err := p.Handle(ctx, namespace, name)
assert.Error(t, err)
})

t.Run("too-large-success", func(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
}
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseRunning, "done", nil)
return nil
}
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once()
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once()

err := p.Handle(ctx, namespace, name)
assert.NoError(t, err)
})
}
23 changes: 19 additions & 4 deletions pkg/controller/workflowstore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,28 @@ import (
"github.com/pkg/errors"
)

var errStaleWorkflowError = fmt.Errorf("stale Workflow Found error")
var errWorkflowNotFound = fmt.Errorf("workflow not-found error")
// ErrStaleWorkflowError signals that the local copy of workflow is Stale, i.e., a new version was written to the datastore,
// But the informer cache has not yet synced to the latest copy
var ErrStaleWorkflowError = fmt.Errorf("stale Workflow Found error")

// ErrWorkflowNotFound indicates that the workflow does not exist and it is safe to ignore the event
var ErrWorkflowNotFound = fmt.Errorf("workflow not-found error")

// ErrWorkflowToLarge is returned in cased an update operation fails because the Workflow object (CRD) has surpassed the Datastores
// supported limit.
var ErrWorkflowToLarge = fmt.Errorf("workflow too large")

// IsNotFound returns true if the error is caused by ErrWorkflowNotFound
func IsNotFound(err error) bool {
return errors.Cause(err) == errWorkflowNotFound
return errors.Cause(err) == ErrWorkflowNotFound
}

// IsWorkflowStale returns true if the error is caused by ErrStaleWorkflowError
func IsWorkflowStale(err error) bool {
return errors.Cause(err) == errStaleWorkflowError
return errors.Cause(err) == ErrStaleWorkflowError
}

// IsWorkflowTooLarge returns true if the error is caused by ErrWorkflowToLarge
func IsWorkflowTooLarge(err error) bool {
return errors.Cause(err) == ErrWorkflowToLarge
}
2 changes: 2 additions & 0 deletions pkg/controller/workflowstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -all

type PriorityClass int

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflowstore/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string)
return v, nil
}
}
return nil, errWorkflowNotFound
return nil, ErrWorkflowNotFound
}

func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
Expand Down
140 changes: 140 additions & 0 deletions pkg/controller/workflowstore/mocks/FlyteWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 380ded7

Please sign in to comment.