Skip to content

Commit

Permalink
FlytePropeller should ignore if Admin is already in terminal state #p…
Browse files Browse the repository at this point in the history
…atch (flyteorg#281)

* FlytePropeller should ignore if Admin is already in terminal state

Signed-off-by: Ketan Umare <[email protected]>

* Lint fixes

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Jul 7, 2021
1 parent 3049039 commit ff8e935
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/controller/nodes/core_phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package nodes

import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

// IsTerminalNodePhase returns true if node phase is one of the terminal phases, else false
func IsTerminalNodePhase(p core.NodeExecution_Phase) bool {
return p == core.NodeExecution_ABORTED || p == core.NodeExecution_FAILED || p == core.NodeExecution_SKIPPED || p == core.NodeExecution_SUCCEEDED || p == core.NodeExecution_TIMED_OUT
}

// IsTerminalTaskPhase returns true if task phase is terminal, else false
func IsTerminalTaskPhase(p core.TaskExecution_Phase) bool {
return p == core.TaskExecution_ABORTED || p == core.TaskExecution_FAILED || p == core.TaskExecution_SUCCEEDED
}
50 changes: 50 additions & 0 deletions pkg/controller/nodes/core_phase_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package nodes

import (
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

func TestIsTerminalNodePhase(t *testing.T) {
tests := []struct {
name string
p core.NodeExecution_Phase
want bool
}{
{"aborted", core.NodeExecution_ABORTED, true},
{"succeeded", core.NodeExecution_SUCCEEDED, true},
{"failed", core.NodeExecution_FAILED, true},
{"timed_out", core.NodeExecution_TIMED_OUT, true},
{"skipped", core.NodeExecution_SKIPPED, true},
{"running", core.NodeExecution_RUNNING, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsTerminalNodePhase(tt.p); got != tt.want {
t.Errorf("IsTerminalNodePhase() = %v, want %v", got, tt.want)
}
})
}
}

func TestIsTerminalTaskPhase(t *testing.T) {
tests := []struct {
name string
p core.TaskExecution_Phase
want bool
}{
{"aborted", core.TaskExecution_ABORTED, true},
{"failed", core.TaskExecution_FAILED, true},
{"succeeded", core.TaskExecution_SUCCEEDED, true},
{"running", core.TaskExecution_RUNNING, false},
{"waiting", core.TaskExecution_WAITING_FOR_RESOURCES, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsTerminalTaskPhase(tt.p); got != tt.want {
t.Errorf("IsTerminalTaskPhase() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 6 additions & 0 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve
nodeEvent.Phase.String(), nodeEvent.GetId().NodeId)
return nil
} else if eventsErr.IsEventAlreadyInTerminalStateError(err) {
if IsTerminalNodePhase(nodeEvent.Phase) {
// Event was trying to record a different terminal phase for an already terminal event. ignoring.
logger.Infof(ctx, "Node event phase: %s, nodeId %s already in terminal phase. err: %s",
nodeEvent.Phase.String(), nodeEvent.GetId().NodeId, err.Error())
return nil
}
logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error())
return errors.Wrapf(errors.IllegalStateError, nodeEvent.Id.NodeId, err, "phase mis-match mismatch between propeller and control plane; Trying to record Node p: %s", nodeEvent.Phase)
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
mocks3 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
Expand Down Expand Up @@ -1824,3 +1826,48 @@ func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) {
assert.Equal(t, s.NodePhase.String(), executors.NodePhaseSuccess.String())
})
}

type fakeNodeEventRecorder struct {
err error
}

func (f fakeNodeEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent) error {
if f.err != nil {
return f.err
}
return nil
}

func Test_nodeExecutor_IdempotentRecordEvent(t *testing.T) {
noErrRecorder := fakeNodeEventRecorder{}
alreadyExistsError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}}
inTerminalError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.EventAlreadyInTerminalStateError, Cause: fmt.Errorf("err")}}
otherError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.ResourceExhausted, Cause: fmt.Errorf("err")}}

tests := []struct {
name string
rec events.NodeEventRecorder
p core.NodeExecution_Phase
wantErr bool
}{
{"aborted-success", noErrRecorder, core.NodeExecution_ABORTED, false},
{"aborted-failure", otherError, core.NodeExecution_ABORTED, true},
{"aborted-already", alreadyExistsError, core.NodeExecution_ABORTED, false},
{"aborted-terminal", inTerminalError, core.NodeExecution_ABORTED, false},
{"running-terminal", inTerminalError, core.NodeExecution_RUNNING, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &nodeExecutor{
nodeRecorder: tt.rec,
}
ev := &event.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{},
Phase: tt.p,
}
if err := c.IdempotentRecordEvent(context.TODO(), ev); (err != nil) != tt.wantErr {
t.Errorf("IdempotentRecordEvent() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/controller/nodes/task_event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (t taskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskEx
logger.Warningf(ctx, "Failed to record taskEvent, error [%s]. Trying to record state: %s. Ignoring this error!", err.Error(), ev.Phase)
return nil
} else if eventsErr.IsEventAlreadyInTerminalStateError(err) {
if IsTerminalTaskPhase(ev.Phase) {
// Event is terminal and the stored value in flyteadmin is already terminal. This implies aborted case. So ignoring
logger.Warningf(ctx, "Failed to record taskEvent, error [%s]. Trying to record state: %s. Ignoring this error!", err.Error(), ev.Phase)
return nil
}
logger.Warningf(ctx, "Failed to record taskEvent in state: %s, error: %s", ev.Phase, err)
return errors.Wrapf(err, "failed to record task event, as it already exists in terminal state. Event state: %s", ev.Phase)
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/controller/nodes/task_event_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package nodes

import (
"context"
"fmt"
"testing"

"github.com/flyteorg/flyteidl/clients/go/events"
eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
)

type fakeTaskEventsRecorder struct {
err error
}

func (f fakeTaskEventsRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error {
if f.err != nil {
return f.err
}
return nil
}

func Test_taskEventRecorder_RecordTaskEvent(t1 *testing.T) {
noErrRecorder := fakeTaskEventsRecorder{}
alreadyExistsError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}}
inTerminalError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.EventAlreadyInTerminalStateError, Cause: fmt.Errorf("err")}}
otherError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.ResourceExhausted, Cause: fmt.Errorf("err")}}

tests := []struct {
name string
rec events.TaskEventRecorder
p core.TaskExecution_Phase
wantErr bool
}{
{"aborted-success", noErrRecorder, core.TaskExecution_ABORTED, false},
{"aborted-failure", otherError, core.TaskExecution_ABORTED, true},
{"aborted-already", alreadyExistsError, core.TaskExecution_ABORTED, false},
{"aborted-terminal", inTerminalError, core.TaskExecution_ABORTED, false},
{"running-terminal", inTerminalError, core.TaskExecution_RUNNING, true},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := taskEventRecorder{
TaskEventRecorder: tt.rec,
}
ev := &event.TaskExecutionEvent{
Phase: tt.p,
}
if err := t.RecordTaskEvent(context.TODO(), ev); (err != nil) != tt.wantErr {
t1.Errorf("RecordTaskEvent() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W
}

if recordingErr := c.IdempotentReportEvent(ctx, wfEvent); recordingErr != nil {
if eventsErr.IsAlreadyExists(recordingErr) {
logger.Warningf(ctx, "Failed to record workflowEvent, error [%s]. Trying to record state: %s. Ignoring this error!", recordingErr.Error(), wfEvent.Phase)
return nil
}
if eventsErr.IsEventAlreadyInTerminalStateError(recordingErr) {
// Move to WorkflowPhaseFailed for state mis-match
msg := fmt.Sprintf("workflow state mismatch between propeller and control plane; Propeller State: %s, ExecutionId %s", wfEvent.Phase.String(), wfEvent.ExecutionId)
Expand Down

0 comments on commit ff8e935

Please sign in to comment.