From ff8e9358570479bdbf7d997fe10c15a66eba3787 Mon Sep 17 00:00:00 2001 From: Ketan Umare <16888709+kumare3@users.noreply.github.com> Date: Wed, 7 Jul 2021 14:24:32 -0700 Subject: [PATCH] FlytePropeller should ignore if Admin is already in terminal state #patch (#281) * FlytePropeller should ignore if Admin is already in terminal state Signed-off-by: Ketan Umare * Lint fixes Signed-off-by: Ketan Umare --- pkg/controller/nodes/core_phase.go | 13 +++++ pkg/controller/nodes/core_phase_test.go | 50 +++++++++++++++++ pkg/controller/nodes/executor.go | 6 ++ pkg/controller/nodes/executor_test.go | 47 ++++++++++++++++ pkg/controller/nodes/task_event_recorder.go | 5 ++ .../nodes/task_event_recorder_test.go | 56 +++++++++++++++++++ pkg/controller/workflow/executor.go | 4 ++ 7 files changed, 181 insertions(+) create mode 100644 pkg/controller/nodes/core_phase.go create mode 100644 pkg/controller/nodes/core_phase_test.go create mode 100644 pkg/controller/nodes/task_event_recorder_test.go diff --git a/pkg/controller/nodes/core_phase.go b/pkg/controller/nodes/core_phase.go new file mode 100644 index 0000000000..7521f90ff8 --- /dev/null +++ b/pkg/controller/nodes/core_phase.go @@ -0,0 +1,13 @@ +package nodes + +import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + +// IsTerminalNodePhase returns true if node phase is one of the terminal phases, else false +func IsTerminalNodePhase(p core.NodeExecution_Phase) bool { + return p == core.NodeExecution_ABORTED || p == core.NodeExecution_FAILED || p == core.NodeExecution_SKIPPED || p == core.NodeExecution_SUCCEEDED || p == core.NodeExecution_TIMED_OUT +} + +// IsTerminalTaskPhase returns true if task phase is terminal, else false +func IsTerminalTaskPhase(p core.TaskExecution_Phase) bool { + return p == core.TaskExecution_ABORTED || p == core.TaskExecution_FAILED || p == core.TaskExecution_SUCCEEDED +} diff --git a/pkg/controller/nodes/core_phase_test.go b/pkg/controller/nodes/core_phase_test.go new file mode 100644 index 0000000000..3f2598629c --- /dev/null +++ b/pkg/controller/nodes/core_phase_test.go @@ -0,0 +1,50 @@ +package nodes + +import ( + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" +) + +func TestIsTerminalNodePhase(t *testing.T) { + tests := []struct { + name string + p core.NodeExecution_Phase + want bool + }{ + {"aborted", core.NodeExecution_ABORTED, true}, + {"succeeded", core.NodeExecution_SUCCEEDED, true}, + {"failed", core.NodeExecution_FAILED, true}, + {"timed_out", core.NodeExecution_TIMED_OUT, true}, + {"skipped", core.NodeExecution_SKIPPED, true}, + {"running", core.NodeExecution_RUNNING, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsTerminalNodePhase(tt.p); got != tt.want { + t.Errorf("IsTerminalNodePhase() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestIsTerminalTaskPhase(t *testing.T) { + tests := []struct { + name string + p core.TaskExecution_Phase + want bool + }{ + {"aborted", core.TaskExecution_ABORTED, true}, + {"failed", core.TaskExecution_FAILED, true}, + {"succeeded", core.TaskExecution_SUCCEEDED, true}, + {"running", core.TaskExecution_RUNNING, false}, + {"waiting", core.TaskExecution_WAITING_FOR_RESOURCES, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsTerminalTaskPhase(tt.p); got != tt.want { + t.Errorf("IsTerminalTaskPhase() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index e751f62d98..f6508ee195 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -133,6 +133,12 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve nodeEvent.Phase.String(), nodeEvent.GetId().NodeId) return nil } else if eventsErr.IsEventAlreadyInTerminalStateError(err) { + if IsTerminalNodePhase(nodeEvent.Phase) { + // Event was trying to record a different terminal phase for an already terminal event. ignoring. + logger.Infof(ctx, "Node event phase: %s, nodeId %s already in terminal phase. err: %s", + nodeEvent.Phase.String(), nodeEvent.GetId().NodeId, err.Error()) + return nil + } logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) return errors.Wrapf(errors.IllegalStateError, nodeEvent.Id.NodeId, err, "phase mis-match mismatch between propeller and control plane; Trying to record Node p: %s", nodeEvent.Phase) } diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index b1fa755c70..d2611ba385 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" + "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" mocks3 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" @@ -1824,3 +1826,48 @@ func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) { assert.Equal(t, s.NodePhase.String(), executors.NodePhaseSuccess.String()) }) } + +type fakeNodeEventRecorder struct { + err error +} + +func (f fakeNodeEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent) error { + if f.err != nil { + return f.err + } + return nil +} + +func Test_nodeExecutor_IdempotentRecordEvent(t *testing.T) { + noErrRecorder := fakeNodeEventRecorder{} + alreadyExistsError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}} + inTerminalError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.EventAlreadyInTerminalStateError, Cause: fmt.Errorf("err")}} + otherError := fakeNodeEventRecorder{&eventsErr.EventError{Code: eventsErr.ResourceExhausted, Cause: fmt.Errorf("err")}} + + tests := []struct { + name string + rec events.NodeEventRecorder + p core.NodeExecution_Phase + wantErr bool + }{ + {"aborted-success", noErrRecorder, core.NodeExecution_ABORTED, false}, + {"aborted-failure", otherError, core.NodeExecution_ABORTED, true}, + {"aborted-already", alreadyExistsError, core.NodeExecution_ABORTED, false}, + {"aborted-terminal", inTerminalError, core.NodeExecution_ABORTED, false}, + {"running-terminal", inTerminalError, core.NodeExecution_RUNNING, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &nodeExecutor{ + nodeRecorder: tt.rec, + } + ev := &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{}, + Phase: tt.p, + } + if err := c.IdempotentRecordEvent(context.TODO(), ev); (err != nil) != tt.wantErr { + t.Errorf("IdempotentRecordEvent() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/controller/nodes/task_event_recorder.go b/pkg/controller/nodes/task_event_recorder.go index 4c5d23878b..3a26f48437 100644 --- a/pkg/controller/nodes/task_event_recorder.go +++ b/pkg/controller/nodes/task_event_recorder.go @@ -21,6 +21,11 @@ func (t taskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskEx logger.Warningf(ctx, "Failed to record taskEvent, error [%s]. Trying to record state: %s. Ignoring this error!", err.Error(), ev.Phase) return nil } else if eventsErr.IsEventAlreadyInTerminalStateError(err) { + if IsTerminalTaskPhase(ev.Phase) { + // Event is terminal and the stored value in flyteadmin is already terminal. This implies aborted case. So ignoring + logger.Warningf(ctx, "Failed to record taskEvent, error [%s]. Trying to record state: %s. Ignoring this error!", err.Error(), ev.Phase) + return nil + } logger.Warningf(ctx, "Failed to record taskEvent in state: %s, error: %s", ev.Phase, err) return errors.Wrapf(err, "failed to record task event, as it already exists in terminal state. Event state: %s", ev.Phase) } diff --git a/pkg/controller/nodes/task_event_recorder_test.go b/pkg/controller/nodes/task_event_recorder_test.go new file mode 100644 index 0000000000..e2558db350 --- /dev/null +++ b/pkg/controller/nodes/task_event_recorder_test.go @@ -0,0 +1,56 @@ +package nodes + +import ( + "context" + "fmt" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/events" + eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" +) + +type fakeTaskEventsRecorder struct { + err error +} + +func (f fakeTaskEventsRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error { + if f.err != nil { + return f.err + } + return nil +} + +func Test_taskEventRecorder_RecordTaskEvent(t1 *testing.T) { + noErrRecorder := fakeTaskEventsRecorder{} + alreadyExistsError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}} + inTerminalError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.EventAlreadyInTerminalStateError, Cause: fmt.Errorf("err")}} + otherError := fakeTaskEventsRecorder{&eventsErr.EventError{Code: eventsErr.ResourceExhausted, Cause: fmt.Errorf("err")}} + + tests := []struct { + name string + rec events.TaskEventRecorder + p core.TaskExecution_Phase + wantErr bool + }{ + {"aborted-success", noErrRecorder, core.TaskExecution_ABORTED, false}, + {"aborted-failure", otherError, core.TaskExecution_ABORTED, true}, + {"aborted-already", alreadyExistsError, core.TaskExecution_ABORTED, false}, + {"aborted-terminal", inTerminalError, core.TaskExecution_ABORTED, false}, + {"running-terminal", inTerminalError, core.TaskExecution_RUNNING, true}, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := taskEventRecorder{ + TaskEventRecorder: tt.rec, + } + ev := &event.TaskExecutionEvent{ + Phase: tt.p, + } + if err := t.RecordTaskEvent(context.TODO(), ev); (err != nil) != tt.wantErr { + t1.Errorf("RecordTaskEvent() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index a9df79f05c..bc1dfb668c 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -325,6 +325,10 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W } if recordingErr := c.IdempotentReportEvent(ctx, wfEvent); recordingErr != nil { + if eventsErr.IsAlreadyExists(recordingErr) { + logger.Warningf(ctx, "Failed to record workflowEvent, error [%s]. Trying to record state: %s. Ignoring this error!", recordingErr.Error(), wfEvent.Phase) + return nil + } if eventsErr.IsEventAlreadyInTerminalStateError(recordingErr) { // Move to WorkflowPhaseFailed for state mis-match msg := fmt.Sprintf("workflow state mismatch between propeller and control plane; Propeller State: %s, ExecutionId %s", wfEvent.Phase.String(), wfEvent.ExecutionId)