Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare committed Jun 9, 2020
1 parent 8af6fed commit bba4104
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 17 deletions.
44 changes: 27 additions & 17 deletions pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,24 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha
w.DataReferenceConstructor = c.store

if !w.Status.IsTerminated() {
reason := "User initiated workflow abort."
reason := fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries)
c.metrics.IncompleteWorkflowAborted.Inc(ctx)
var err error
// Check of the workflow was deleted and that caused the abort
if w.GetDeletionTimestamp() != nil {
reason = "User initiated workflow abort."
}

// We will always try to cleanup, even if we have extinguished all our retries
// TODO ABORT should have its separate set of retries
err := c.cleanupRunningNodes(ctx, w, reason)
// Best effort clean-up.
if err != nil && w.Status.FailedAttempts <= maxRetries {
logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err)
return err
}

if w.Status.FailedAttempts > maxRetries {
reason = fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries)
err = errors.Errorf(errors.RuntimeExecutionError, w.GetID(), "max number of system retry attempts [%d/%d] exhausted. Last known status message: %v", w.Status.FailedAttempts, maxRetries, w.Status.Message)
} else {
// Best effort clean-up.
if err2 := c.cleanupRunningNodes(ctx, w, reason); err2 != nil {
logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err2)
return err2
}
}

var status Status
Expand Down Expand Up @@ -486,13 +492,17 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al
wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope),
k8sRecorder: k8sEventRecorder,
metadataPrefix: basePrefix,
metrics: &workflowMetrics{
AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope),
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric),
AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
},
metrics: newMetrics(workflowScope),
}, nil
}

func newMetrics(workflowScope promutils.Scope) *workflowMetrics {
return &workflowMetrics{
AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope),
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric),
AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
}
}
169 changes: 169 additions & 0 deletions pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/stretchr/testify/mock"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/util/sets"

Expand All @@ -20,6 +22,7 @@ import (
"github.com/lyft/flytestdlib/logger"

eventsErr "github.com/lyft/flyteidl/clients/go/events/errors"

mocks2 "github.com/lyft/flytepropeller/pkg/controller/executors/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/fakeplugins"
Expand Down Expand Up @@ -655,3 +658,169 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
assert.True(t, wfErrors.Matches(err, wfErrors.EventRecordingError))
})
}

func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) {
ctx := context.TODO()

t.Run("user-initiated-fail", func(t *testing.T) {

nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
})

t.Run("user-initiated-success", func(t *testing.T) {

var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("user-initiated-attempts-exhausted", func(t *testing.T) {

var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 6,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(6), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("failure-abort-success", func(t *testing.T) {
var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 5,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(5), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("failure-abort-failed", func(t *testing.T) {

nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err"))

w := &v1alpha1.FlyteWorkflow{
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
})
}

0 comments on commit bba4104

Please sign in to comment.