From 380ded76c5c976bb6094c79baf25d738ec49eedb Mon Sep 17 00:00:00 2001 From: Ketan Umare <16888709+kumare3@users.noreply.github.com> Date: Fri, 26 Mar 2021 18:02:56 -0700 Subject: [PATCH] Large workflows will not cause workflows to stay in running state #minor (#240) * Large workflows will not cause workflows to stay in running state - Workflows will be moved to Failing state - Incase of errors, the whole thing will be retried - system error number of times Signed-off-by: Ketan Umare * Updated docs Signed-off-by: Ketan Umare * addressed comments Signed-off-by: Ketan Umare * Better docs Signed-off-by: Ketan Umare * Linter fix Signed-off-by: Ketan Umare --- pkg/controller/controller.go | 2 + pkg/controller/handler.go | 29 +++- pkg/controller/handler_test.go | 92 +++++++++++- pkg/controller/workflowstore/errors.go | 23 ++- pkg/controller/workflowstore/iface.go | 2 + pkg/controller/workflowstore/inmemory.go | 2 +- .../workflowstore/mocks/FlyteWorkflow.go | 140 ++++++++++++++++++ pkg/controller/workflowstore/passthrough.go | 12 +- .../workflowstore/resource_version_caching.go | 2 +- 9 files changed, 295 insertions(+), 9 deletions(-) create mode 100644 pkg/controller/workflowstore/mocks/FlyteWorkflow.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cbae394824..988c180e54 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1,3 +1,5 @@ +// Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation. +// It is then entrypoint into the K8s based Flyte controller. package controller import ( diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 56fe63dd1c..24afc1115d 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -7,6 +7,8 @@ import ( "runtime/debug" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/promutils/labeled" @@ -52,6 +54,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { } } +// Helper method to record system error in the workflow. func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow { // Let's mark these as system errors. // We only want to increase failed attempts and discard any other partial changes to the CRD. @@ -61,6 +64,7 @@ func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWork return wfDeepCopy } +// Core Propeller structure that houses the Reconciliation loop for Flytepropeller type Propeller struct { wfStore workflowstore.FlyteWorkflow workflowExecutor executors.Workflow @@ -68,10 +72,13 @@ type Propeller struct { cfg *config.Config } +// Initializes all downstream executors func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } +// TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. +// The desired state here is the entire workflow is completed, actual state is each nodes current execution state. func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { t := p.metrics.DeepCopyTime.Start() @@ -134,7 +141,8 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F return mutableW, nil } -// reconciler compares the actual state with the desired, and attempts to +// Handle method is the entry point for the reconciler. +// It compares the actual state with the desired, and attempts to // converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource // with the current status of the resource. // Every FlyteWorkflow transitions through the following @@ -227,6 +235,24 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical) if updateErr != nil { t.Stop() + // The update has failed, lets check if this is because the size is too large. If so + if workflowstore.IsWorkflowTooLarge(updateErr) { + logger.Errorf(ctx, "Failed storing workflow to the store, reason: %s", updateErr) + p.metrics.SystemError.Inc(ctx) + // Workflow is too large, we will mark the workflow as failing and record it. This will automatically + // propagate the failure in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "WorkflowTooLarge", + Message: "Workflow execution state is too large for Flyte to handle.", + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e) + return e + } + return nil + } return updateErr } if err != nil { @@ -249,6 +275,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { return nil } +// NewPropellerHandler creates a new Propeller and initializes metrics func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller { metrics := newPropellerMetrics(scope) diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index a23b4cd80d..b42667340d 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -6,6 +6,10 @@ import ( "testing" "time" + "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks" + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +56,15 @@ func TestPropeller_Handle(t *testing.T) { assert.NoError(t, p.Handle(ctx, namespace, name)) }) + t.Run("stale", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, s, exec, scope) + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrStaleWorkflowError, "stale")).Once() + assert.NoError(t, p.Handle(ctx, namespace, name)) + }) + t.Run("terminated-and-finalized", func(t *testing.T) { w := &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ @@ -450,7 +463,6 @@ func TestPropeller_Handle(t *testing.T) { } func TestPropeller_Handle_TurboMode(t *testing.T) { - // TODO unit tests need to fixed scope := promutils.NewTestScope() ctx := context.TODO() s := workflowstore.NewInMemoryWorkflowStore() @@ -669,3 +681,81 @@ func TestPropellerHandler_Initialize(t *testing.T) { assert.NoError(t, p.Initialize(ctx)) } + +func TestNewPropellerHandler_UpdateFailure(t *testing.T) { + ctx := context.TODO() + cfg := &config.Config{ + MaxWorkflowRetries: 0, + } + + const namespace = "test" + const name = "123" + + t.Run("unknown error", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("unknown error")).Once() + + err := p.Handle(ctx, namespace, name) + assert.Error(t, err) + }) + + t.Run("too-large-fail-repeat", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Twice() + + err := p.Handle(ctx, namespace, name) + assert.Error(t, err) + }) + + t.Run("too-large-success", func(t *testing.T) { + scope := promutils.NewTestScope() + s := &mocks.FlyteWorkflow{} + exec := &mockExecutor{} + p := NewPropellerHandler(ctx, cfg, s, exec, scope) + wf := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + } + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseRunning, "done", nil) + return nil + } + s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil) + s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once() + s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once() + + err := p.Handle(ctx, namespace, name) + assert.NoError(t, err) + }) +} diff --git a/pkg/controller/workflowstore/errors.go b/pkg/controller/workflowstore/errors.go index 572508850a..8912d4791f 100644 --- a/pkg/controller/workflowstore/errors.go +++ b/pkg/controller/workflowstore/errors.go @@ -6,13 +6,28 @@ import ( "github.com/pkg/errors" ) -var errStaleWorkflowError = fmt.Errorf("stale Workflow Found error") -var errWorkflowNotFound = fmt.Errorf("workflow not-found error") +// ErrStaleWorkflowError signals that the local copy of workflow is Stale, i.e., a new version was written to the datastore, +// But the informer cache has not yet synced to the latest copy +var ErrStaleWorkflowError = fmt.Errorf("stale Workflow Found error") +// ErrWorkflowNotFound indicates that the workflow does not exist and it is safe to ignore the event +var ErrWorkflowNotFound = fmt.Errorf("workflow not-found error") + +// ErrWorkflowToLarge is returned in cased an update operation fails because the Workflow object (CRD) has surpassed the Datastores +// supported limit. +var ErrWorkflowToLarge = fmt.Errorf("workflow too large") + +// IsNotFound returns true if the error is caused by ErrWorkflowNotFound func IsNotFound(err error) bool { - return errors.Cause(err) == errWorkflowNotFound + return errors.Cause(err) == ErrWorkflowNotFound } +// IsWorkflowStale returns true if the error is caused by ErrStaleWorkflowError func IsWorkflowStale(err error) bool { - return errors.Cause(err) == errStaleWorkflowError + return errors.Cause(err) == ErrStaleWorkflowError +} + +// IsWorkflowTooLarge returns true if the error is caused by ErrWorkflowToLarge +func IsWorkflowTooLarge(err error) bool { + return errors.Cause(err) == ErrWorkflowToLarge } diff --git a/pkg/controller/workflowstore/iface.go b/pkg/controller/workflowstore/iface.go index f224dfe797..84477450a9 100644 --- a/pkg/controller/workflowstore/iface.go +++ b/pkg/controller/workflowstore/iface.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -all + type PriorityClass int const ( diff --git a/pkg/controller/workflowstore/inmemory.go b/pkg/controller/workflowstore/inmemory.go index fb80f97f91..874029222b 100644 --- a/pkg/controller/workflowstore/inmemory.go +++ b/pkg/controller/workflowstore/inmemory.go @@ -41,7 +41,7 @@ func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string) return v, nil } } - return nil, errWorkflowNotFound + return nil, ErrWorkflowNotFound } func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( diff --git a/pkg/controller/workflowstore/mocks/FlyteWorkflow.go b/pkg/controller/workflowstore/mocks/FlyteWorkflow.go new file mode 100644 index 0000000000..c3ca6f1b24 --- /dev/null +++ b/pkg/controller/workflowstore/mocks/FlyteWorkflow.go @@ -0,0 +1,140 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + v1alpha1 "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + mock "github.com/stretchr/testify/mock" + + workflowstore "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore" +) + +// FlyteWorkflow is an autogenerated mock type for the FlyteWorkflow type +type FlyteWorkflow struct { + mock.Mock +} + +type FlyteWorkflow_Get struct { + *mock.Call +} + +func (_m FlyteWorkflow_Get) Return(_a0 *v1alpha1.FlyteWorkflow, _a1 error) *FlyteWorkflow_Get { + return &FlyteWorkflow_Get{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *FlyteWorkflow) OnGet(ctx context.Context, namespace string, name string) *FlyteWorkflow_Get { + c := _m.On("Get", ctx, namespace, name) + return &FlyteWorkflow_Get{Call: c} +} + +func (_m *FlyteWorkflow) OnGetMatch(matchers ...interface{}) *FlyteWorkflow_Get { + c := _m.On("Get", matchers...) + return &FlyteWorkflow_Get{Call: c} +} + +// Get provides a mock function with given fields: ctx, namespace, name +func (_m *FlyteWorkflow) Get(ctx context.Context, namespace string, name string) (*v1alpha1.FlyteWorkflow, error) { + ret := _m.Called(ctx, namespace, name) + + var r0 *v1alpha1.FlyteWorkflow + if rf, ok := ret.Get(0).(func(context.Context, string, string) *v1alpha1.FlyteWorkflow); ok { + r0 = rf(ctx, namespace, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1alpha1.FlyteWorkflow) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, namespace, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type FlyteWorkflow_Update struct { + *mock.Call +} + +func (_m FlyteWorkflow_Update) Return(newWF *v1alpha1.FlyteWorkflow, err error) *FlyteWorkflow_Update { + return &FlyteWorkflow_Update{Call: _m.Call.Return(newWF, err)} +} + +func (_m *FlyteWorkflow) OnUpdate(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass workflowstore.PriorityClass) *FlyteWorkflow_Update { + c := _m.On("Update", ctx, workflow, priorityClass) + return &FlyteWorkflow_Update{Call: c} +} + +func (_m *FlyteWorkflow) OnUpdateMatch(matchers ...interface{}) *FlyteWorkflow_Update { + c := _m.On("Update", matchers...) + return &FlyteWorkflow_Update{Call: c} +} + +// Update provides a mock function with given fields: ctx, workflow, priorityClass +func (_m *FlyteWorkflow) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass workflowstore.PriorityClass) (*v1alpha1.FlyteWorkflow, error) { + ret := _m.Called(ctx, workflow, priorityClass) + + var r0 *v1alpha1.FlyteWorkflow + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.FlyteWorkflow, workflowstore.PriorityClass) *v1alpha1.FlyteWorkflow); ok { + r0 = rf(ctx, workflow, priorityClass) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1alpha1.FlyteWorkflow) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.FlyteWorkflow, workflowstore.PriorityClass) error); ok { + r1 = rf(ctx, workflow, priorityClass) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type FlyteWorkflow_UpdateStatus struct { + *mock.Call +} + +func (_m FlyteWorkflow_UpdateStatus) Return(newWF *v1alpha1.FlyteWorkflow, err error) *FlyteWorkflow_UpdateStatus { + return &FlyteWorkflow_UpdateStatus{Call: _m.Call.Return(newWF, err)} +} + +func (_m *FlyteWorkflow) OnUpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass workflowstore.PriorityClass) *FlyteWorkflow_UpdateStatus { + c := _m.On("UpdateStatus", ctx, workflow, priorityClass) + return &FlyteWorkflow_UpdateStatus{Call: c} +} + +func (_m *FlyteWorkflow) OnUpdateStatusMatch(matchers ...interface{}) *FlyteWorkflow_UpdateStatus { + c := _m.On("UpdateStatus", matchers...) + return &FlyteWorkflow_UpdateStatus{Call: c} +} + +// UpdateStatus provides a mock function with given fields: ctx, workflow, priorityClass +func (_m *FlyteWorkflow) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass workflowstore.PriorityClass) (*v1alpha1.FlyteWorkflow, error) { + ret := _m.Called(ctx, workflow, priorityClass) + + var r0 *v1alpha1.FlyteWorkflow + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.FlyteWorkflow, workflowstore.PriorityClass) *v1alpha1.FlyteWorkflow); ok { + r0 = rf(ctx, workflow, priorityClass) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1alpha1.FlyteWorkflow) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.FlyteWorkflow, workflowstore.PriorityClass) error); ok { + r1 = rf(ctx, workflow, priorityClass) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/pkg/controller/workflowstore/passthrough.go b/pkg/controller/workflowstore/passthrough.go index b837d28565..a72e0bc4a3 100644 --- a/pkg/controller/workflowstore/passthrough.go +++ b/pkg/controller/workflowstore/passthrough.go @@ -20,6 +20,7 @@ type workflowstoreMetrics struct { workflowUpdateFailedCount prometheus.Counter workflowUpdateSuccessCount prometheus.Counter workflowUpdateConflictCount prometheus.Counter + workflowTooLarge prometheus.Counter workflowUpdateLatency promutils.StopWatch } @@ -36,7 +37,7 @@ func (p *passthroughWorkflowStore) Get(ctx context.Context, namespace, name stri // processing. if kubeerrors.IsNotFound(err) { logger.Warningf(ctx, "Workflow not found in cache.") - return nil, errWorkflowNotFound + return nil, ErrWorkflowNotFound } return nil, err } @@ -58,6 +59,10 @@ func (p *passthroughWorkflowStore) UpdateStatus(ctx context.Context, workflow *v if kubeerrors.IsConflict(err) { p.metrics.workflowUpdateConflictCount.Inc() } + if kubeerrors.IsRequestEntityTooLargeError(err) { + p.metrics.workflowTooLarge.Inc() + return nil, ErrWorkflowToLarge + } p.metrics.workflowUpdateFailedCount.Inc() logger.Errorf(ctx, "Failed to update workflow status. Error [%v]", err) return nil, err @@ -82,6 +87,10 @@ func (p *passthroughWorkflowStore) Update(ctx context.Context, workflow *v1alpha if kubeerrors.IsConflict(err) { p.metrics.workflowUpdateConflictCount.Inc() } + if kubeerrors.IsRequestEntityTooLargeError(err) { + p.metrics.workflowTooLarge.Inc() + return nil, ErrWorkflowToLarge + } p.metrics.workflowUpdateFailedCount.Inc() logger.Errorf(ctx, "Failed to update workflow. Error [%v]", err) return nil, err @@ -101,6 +110,7 @@ func NewPassthroughWorkflowStore(_ context.Context, scope promutils.Scope, wfCli workflowUpdateConflictCount: scope.MustNewCounter("wf_update_conflict", "Failure to update ETCd because of conflict"), workflowUpdateSuccessCount: scope.MustNewCounter("wf_update_success", "Success in updating ETCd"), workflowUpdateLatency: scope.MustNewStopWatch("wf_update_latency", "Time taken to complete update/updatestatus", time.Millisecond), + workflowTooLarge: scope.MustNewCounter("wf_too_large", "Failure to update ETCd because of size of the workflow is too large."), } return &passthroughWorkflowStore{ diff --git a/pkg/controller/workflowstore/resource_version_caching.go b/pkg/controller/workflowstore/resource_version_caching.go index 287422d15c..141dc0bb6d 100644 --- a/pkg/controller/workflowstore/resource_version_caching.go +++ b/pkg/controller/workflowstore/resource_version_caching.go @@ -61,7 +61,7 @@ func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string if w != nil { if r.isResourceVersionSameAsPrevious(ctx, namespace, name, w.ResourceVersion) { - return nil, errStaleWorkflowError + return nil, ErrStaleWorkflowError } }