Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] array node eventing bump version #5680

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error

if s.filter.Contains(ctx, id) {
logger.Debugf(ctx, "event '%s' has already been sent", string(id))
return nil
return &errors.EventError{
Code: errors.AlreadyExists,
Cause: fmt.Errorf("event has already been sent"),
Message: "Event Already Exists",
}
}

// Validate submission with rate limiter and send admin event
Expand Down
9 changes: 6 additions & 3 deletions flytepropeller/events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,16 @@ func TestAdminFilterContains(t *testing.T) {
filter.OnContainsMatch(mock.Anything, mock.Anything).Return(true)

wfErr := adminEventSink.Sink(ctx, wfEvent)
assert.NoError(t, wfErr)
assert.Error(t, wfErr)
assert.True(t, errors.IsAlreadyExists(wfErr))

nodeErr := adminEventSink.Sink(ctx, nodeEvent)
assert.NoError(t, nodeErr)
assert.Error(t, nodeErr)
assert.True(t, errors.IsAlreadyExists(nodeErr))

taskErr := adminEventSink.Sink(ctx, taskEvent)
assert.NoError(t, taskErr)
assert.Error(t, taskErr)
assert.True(t, errors.IsAlreadyExists(taskErr))
}

func TestIDFromMessage(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion flytepropeller/events/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
}

func (r EventError) Error() string {
return fmt.Sprintf("%s: %s, caused by [%s]", r.Code, r.Message, r.Cause.Error())
var cause string
if r.Cause != nil {
cause = r.Cause.Error()

Check warning on line 38 in flytepropeller/events/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/events/errors/errors.go#L36-L38

Added lines #L36 - L38 were not covered by tests
}
return fmt.Sprintf("%s: %s, caused by [%s]", r.Code, r.Message, cause)

Check warning on line 40 in flytepropeller/events/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/events/errors/errors.go#L40

Added line #L40 was not covered by tests
}

func (r *EventError) Is(target error) bool {
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ const (
type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."`
}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 46 additions & 5 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/errorcollector"
"github.com/flyteorg/flyte/flytepropeller/events"
eventsErr "github.com/flyteorg/flyte/flytepropeller/events/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/validators"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
Expand All @@ -21,6 +22,7 @@
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
"github.com/flyteorg/flyte/flytestdlib/bitarray"
stdConfig "github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
Expand Down Expand Up @@ -112,6 +114,10 @@

// update state for subNodes
if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_ABORTED, 0, a.eventConfig); err != nil {
// a task event with abort phase is already emitted when handling ArrayNodePhaseFailing
if eventsErr.IsAlreadyExists(err) {
return nil

Check warning on line 119 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return err
}
Expand Down Expand Up @@ -579,12 +585,35 @@

// increment taskPhaseVersion if we detect any changes in subNode state.
if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
arrayNodeState.TaskPhaseVersion++
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
const maxRetries = 3
retries := 0
for retries <= maxRetries {
err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig)

if err == nil {
break
}

// Handle potential race condition if FlyteWorkflow CRD fails to get synced
if eventsErr.IsAlreadyExists(err) {
if !incrementTaskPhaseVersion {
break

Check warning on line 603 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L603

Added line #L603 was not covered by tests
}
logger.Warnf(ctx, "Event version already exists, bumping version and retrying (%d/%d): [%s]", retries+1, maxRetries, err.Error())
arrayNodeState.TaskPhaseVersion++
} else {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}

retries++
if retries > maxRetries {
logger.Errorf(ctx, "ArrayNode event recording failed after %d retries: [%s]", maxRetries, err.Error())
return handler.UnknownTransition, err
}
}

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0
Expand Down Expand Up @@ -632,9 +661,21 @@
return nil, err
}

eventConfigCopy, err := stdConfig.DeepCopyConfig(eventConfig)
if err != nil {
return nil, err

Check warning on line 666 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L666

Added line #L666 was not covered by tests
}

deepCopiedEventConfig, ok := eventConfigCopy.(*config.EventConfig)
if !ok {
return nil, fmt.Errorf("deep copy error: expected *config.EventConfig, but got %T", eventConfigCopy)

Check warning on line 671 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L671

Added line #L671 was not covered by tests
}

deepCopiedEventConfig.ErrorOnAlreadyExists = true

arrayScope := scope.NewSubScope("array")
return &arrayNodeHandler{
eventConfig: eventConfig,
eventConfig: deepCopiedEventConfig,
gatherOutputsRequestChannel: make(chan *gatherOutputsRequest),
metrics: newMetrics(arrayScope),
nodeExecutionRequestChannel: make(chan *nodeExecutionRequest),
Expand Down
Loading
Loading