Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

bug; aborted workflows bubble up the error and stay in running #149

Merged
merged 3 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,24 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha
w.DataReferenceConstructor = c.store

if !w.Status.IsTerminated() {
reason := "User initiated workflow abort."
reason := fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries)
c.metrics.IncompleteWorkflowAborted.Inc(ctx)
var err error
if w.Status.FailedAttempts > maxRetries {
reason = fmt.Sprintf("max number of system retry attempts [%d/%d] exhausted - system failure.", w.Status.FailedAttempts, maxRetries)
err = errors.Errorf(errors.RuntimeExecutionError, w.GetID(), "max number of system retry attempts [%d/%d] exhausted. Last known status message: %v", w.Status.FailedAttempts, maxRetries, w.Status.Message)
// Check of the workflow was deleted and that caused the abort
if w.GetDeletionTimestamp() != nil {
reason = "Workflow aborted."
}

// We will always try to cleanup, even if we have extinguished all our retries
// TODO ABORT should have its separate set of retries
err := c.cleanupRunningNodes(ctx, w, reason)
// Best effort clean-up.
if err2 := c.cleanupRunningNodes(ctx, w, reason); err2 != nil {
logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err2)
return err2
if err != nil && w.Status.FailedAttempts <= maxRetries {
logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err)
return err
}

if w.Status.FailedAttempts > maxRetries {
err = errors.Errorf(errors.RuntimeExecutionError, w.GetID(), "max number of system retry attempts [%d/%d] exhausted. Last known status message: %v", w.Status.FailedAttempts, maxRetries, w.Status.Message)
}

var status Status
Expand Down Expand Up @@ -486,13 +492,17 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al
wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope),
k8sRecorder: k8sEventRecorder,
metadataPrefix: basePrefix,
metrics: &workflowMetrics{
AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope),
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric),
AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
},
metrics: newMetrics(workflowScope),
}, nil
}

func newMetrics(workflowScope promutils.Scope) *workflowMetrics {
return &workflowMetrics{
AcceptedWorkflows: labeled.NewCounter("accepted", "Number of workflows accepted by propeller", workflowScope),
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
IncompleteWorkflowAborted: labeled.NewCounter("workflow_aborted", "Indicates an inprogress execution was aborted", workflowScope, labeled.EmitUnlabeledMetric),
AcceptanceLatency: labeled.NewStopWatch("acceptance_latency", "Delay between workflow creation and moving it to running state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
CompletionLatency: labeled.NewStopWatch("completion_latency", "Measures the time between when the WF moved to succeeding/failing state and when it finally moved to a terminal state.", time.Millisecond, workflowScope, labeled.EmitUnlabeledMetric),
}
}
169 changes: 169 additions & 0 deletions pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/stretchr/testify/mock"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/util/sets"

Expand All @@ -20,6 +22,7 @@ import (
"github.com/lyft/flytestdlib/logger"

eventsErr "github.com/lyft/flyteidl/clients/go/events/errors"

mocks2 "github.com/lyft/flytepropeller/pkg/controller/executors/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/fakeplugins"
Expand Down Expand Up @@ -655,3 +658,169 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
assert.True(t, wfErrors.Matches(err, wfErrors.EventRecordingError))
})
}

func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) {
ctx := context.TODO()

t.Run("user-initiated-fail", func(t *testing.T) {

nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
})

t.Run("user-initiated-success", func(t *testing.T) {

var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("user-initiated-attempts-exhausted", func(t *testing.T) {

var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
DeletionTimestamp: &v1.Time{},
},
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 6,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(6), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("failure-abort-success", func(t *testing.T) {
var evs []*event.WorkflowExecutionEvent
nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
wfRecorder: &events.MockRecorder{
RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error {
evs = append(evs, event)
return nil
},
},
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

w := &v1alpha1.FlyteWorkflow{
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 5,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.NoError(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(5), w.Status.FailedAttempts)
assert.Len(t, evs, 1)
})

t.Run("failure-abort-failed", func(t *testing.T) {

nodeExec := &mocks2.Node{}
wExec := &workflowExecutor{
nodeExecutor: nodeExec,
metrics: newMetrics(promutils.NewTestScope()),
}

nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err"))

w := &v1alpha1.FlyteWorkflow{
Status: v1alpha1.WorkflowStatus{
FailedAttempts: 1,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.StartNodeID: {},
},
},
}

assert.Error(t, wExec.HandleAbortedWorkflow(ctx, w, 5))

assert.Equal(t, uint32(1), w.Status.FailedAttempts)
})
}