diff --git a/events/errors/errors.go b/events/errors/errors.go index 3f71b9c084..e078fb208c 100644 --- a/events/errors/errors.go +++ b/events/errors/errors.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/logger" @@ -18,6 +19,7 @@ const ( ExecutionNotFound ErrorCode = "ExecutionNotFound" ResourceExhausted ErrorCode = "ResourceExhausted" InvalidArgument ErrorCode = "InvalidArgument" + TooLarge ErrorCode = "TooLarge" EventSinkError ErrorCode = "EventSinkError" EventAlreadyInTerminalStateError ErrorCode = "EventAlreadyInTerminalStateError" ) @@ -78,6 +80,10 @@ func WrapError(err error) error { case codes.NotFound: return wrapf(ExecutionNotFound, err, "The execution that the event belongs to does not exist") case codes.ResourceExhausted: + if strings.Contains(statusErr.Message(), "message larger than max") { + return wrapf(TooLarge, err, "Event message exceeds maximum gRPC size limit") + } + return wrapf(ResourceExhausted, err, "Events are sent too often, exceeded the rate limit") case codes.InvalidArgument: return wrapf(InvalidArgument, err, "Invalid fields for event message") @@ -115,6 +121,11 @@ func IsResourceExhausted(err error) bool { return errors.Is(err, &EventError{Code: ResourceExhausted}) } +// Checks if the error is of type EventError and the ErrorCode is of type TooLarge +func IsTooLarge(err error) bool { + return errors.Is(err, &EventError{Code: TooLarge}) +} + // Checks if the error is of type EventError and the ErrorCode is of type EventAlreadyInTerminalStateError func IsEventAlreadyInTerminalStateError(err error) bool { return errors.Is(err, &EventError{Code: EventAlreadyInTerminalStateError}) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 71500a105f..697f6076eb 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -576,8 +576,32 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node err = c.IdempotentRecordEvent(ctx, nev) if err != nil { - logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) - return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") + if eventsErr.IsTooLarge(err) { + // With large enough dynamic task fanouts the reported node event, which contains the compiled + // workflow closure, can exceed the gRPC message size limit. In this case we immediately + // transition the node to failing to abort the workflow. + np = v1alpha1.NodePhaseFailing + p = handler.PhaseInfoFailure(core.ExecutionError_USER, "NodeFailed", err.Error(), p.GetInfo()) + + err = c.IdempotentRecordEvent(ctx, &event.NodeExecutionEvent{ + Id: nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + Phase: core.NodeExecution_FAILED, + OccurredAt: ptypes.TimestampNow(), + OutputResult: &event.NodeExecutionEvent_Error{ + Error: &core.ExecutionError{ + Code: "NodeFailed", + Message: err.Error(), + }, + }, + }) + + if err != nil { + return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") + } + } else { + logger.Warningf(ctx, "Failed to record nodeEvent, error [%s]", err.Error()) + return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event") + } } // We reach here only when transitioning from Queued to Running. In this case, the startedAt is not set.