From bba41048ce7988bfe95d481621ce48392d370c11 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 8 Jun 2020 17:23:16 -0700 Subject: [PATCH] Added more tests --- pkg/controller/workflow/executor.go | 44 +++--- pkg/controller/workflow/executor_test.go | 169 +++++++++++++++++++++++ 2 files changed, 196 insertions(+), 17 deletions(-) diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 7041dbd2e..75397ca3f 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -417,18 +417,24 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha w.DataReferenceConstructor = c.store if !w.Status.IsTerminated() { - reason := "User initiated workflow abort." + reason := fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries) c.metrics.IncompleteWorkflowAborted.Inc(ctx) - var err error + // Check of the workflow was deleted and that caused the abort + if w.GetDeletionTimestamp() != nil { + reason = "User initiated workflow abort." + } + + // We will always try to cleanup, even if we have extinguished all our retries + // TODO ABORT should have its separate set of retries + err := c.cleanupRunningNodes(ctx, w, reason) + // Best effort clean-up. + if err != nil && w.Status.FailedAttempts <= maxRetries { + logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err) + return err + } + if w.Status.FailedAttempts > maxRetries { - reason = fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries) err = errors.Errorf(errors.RuntimeExecutionError, w.GetID(), "max number of system retry attempts [%d/%d] exhausted. Last known status message: %v", w.Status.FailedAttempts, maxRetries, w.Status.Message) - } else { - // Best effort clean-up. - if err2 := c.cleanupRunningNodes(ctx, w, reason); err2 != nil { - logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err2) - return err2 - } } var status Status @@ -486,13 +492,17 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope), k8sRecorder: k8sEventRecorder, metadataPrefix: basePrefix, - metrics: &workflowMetrics{ - AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope), - FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), - SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), - IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric), - AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), - CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), - }, + metrics: newMetrics(workflowScope), }, nil } + +func newMetrics(workflowScope promutils.Scope) *workflowMetrics { + return &workflowMetrics{ + AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope), + FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), + SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), + IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric), + AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), + CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric), + } +} diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index f19194194..d00820a3a 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -11,6 +11,8 @@ import ( "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/stretchr/testify/mock" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -20,6 +22,7 @@ import ( "github.com/lyft/flytestdlib/logger" eventsErr "github.com/lyft/flyteidl/clients/go/events/errors" + mocks2 "github.com/lyft/flytepropeller/pkg/controller/executors/mocks" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/fakeplugins" @@ -655,3 +658,169 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { assert.True(t, wfErrors.Matches(err, wfErrors.EventRecordingError)) }) } + +func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) { + ctx := context.TODO() + + t.Run("user-initiated-fail", func(t *testing.T) { + + nodeExec := &mocks2.Node{} + wExec := &workflowExecutor{ + nodeExecutor: nodeExec, + metrics: newMetrics(promutils.NewTestScope()), + } + + nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error")) + + w := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + DeletionTimestamp: &v1.Time{}, + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{ + v1alpha1.StartNodeID: {}, + }, + }, + } + + assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5)) + + assert.Equal(t, uint32(1), w.Status.FailedAttempts) + }) + + t.Run("user-initiated-success", func(t *testing.T) { + + var evs []*event.WorkflowExecutionEvent + nodeExec := &mocks2.Node{} + wExec := &workflowExecutor{ + nodeExecutor: nodeExec, + wfRecorder: &events.MockRecorder{ + RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + evs = append(evs, event) + return nil + }, + }, + metrics: newMetrics(promutils.NewTestScope()), + } + + nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + w := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + DeletionTimestamp: &v1.Time{}, + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{ + v1alpha1.StartNodeID: {}, + }, + }, + } + + assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5)) + + assert.Equal(t, uint32(1), w.Status.FailedAttempts) + assert.Len(t, evs, 1) + }) + + t.Run("user-initiated-attempts-exhausted", func(t *testing.T) { + + var evs []*event.WorkflowExecutionEvent + nodeExec := &mocks2.Node{} + wExec := &workflowExecutor{ + nodeExecutor: nodeExec, + wfRecorder: &events.MockRecorder{ + RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + evs = append(evs, event) + return nil + }, + }, + metrics: newMetrics(promutils.NewTestScope()), + } + + nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + w := &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + DeletionTimestamp: &v1.Time{}, + }, + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 6, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{ + v1alpha1.StartNodeID: {}, + }, + }, + } + + assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5)) + + assert.Equal(t, uint32(6), w.Status.FailedAttempts) + assert.Len(t, evs, 1) + }) + + t.Run("failure-abort-success", func(t *testing.T) { + var evs []*event.WorkflowExecutionEvent + nodeExec := &mocks2.Node{} + wExec := &workflowExecutor{ + nodeExecutor: nodeExec, + wfRecorder: &events.MockRecorder{ + RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + evs = append(evs, event) + return nil + }, + }, + metrics: newMetrics(promutils.NewTestScope()), + } + + nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + w := &v1alpha1.FlyteWorkflow{ + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 5, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{ + v1alpha1.StartNodeID: {}, + }, + }, + } + + assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5)) + + assert.Equal(t, uint32(5), w.Status.FailedAttempts) + assert.Len(t, evs, 1) + }) + + t.Run("failure-abort-failed", func(t *testing.T) { + + nodeExec := &mocks2.Node{} + wExec := &workflowExecutor{ + nodeExecutor: nodeExec, + metrics: newMetrics(promutils.NewTestScope()), + } + + nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")) + + w := &v1alpha1.FlyteWorkflow{ + Status: v1alpha1.WorkflowStatus{ + FailedAttempts: 1, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{ + v1alpha1.StartNodeID: {}, + }, + }, + } + + assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5)) + + assert.Equal(t, uint32(1), w.Status.FailedAttempts) + }) +}