Skip to content

Commit

Permalink
Truncate admin event error messages (flyteorg#352)
Browse files Browse the repository at this point in the history
* truncating admin event error messages to 100KB

Signed-off-by: Daniel Rammer <[email protected]>

* added event recorder unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* removed unnecessary message creation loop in event recorder testing

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Oct 25, 2021
1 parent b933238 commit 9147b19
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 4 deletions.
4 changes: 0 additions & 4 deletions events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ import (
"google.golang.org/grpc/status"
)

// This test suite uses Mockery to mock the AdminServiceClient. Run the following command in CLI or in the IntelliJ
// IDE "Go Generate File". This will create a mocks/AdminServiceClient.go file
//go:generate mockery -dir ../../../gen/pb-go/flyteidl/service -name AdminServiceClient -output ../admin/mocks

var (
wfEvent = &event.WorkflowExecutionEvent{
ExecutionId: &core.WorkflowExecutionIdentifier{
Expand Down
24 changes: 24 additions & 0 deletions events/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package events

import (
"context"
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/golang/protobuf/proto"
)

const maxErrorMessageLength = 104857600 //100KB

type recordingMetrics struct {
EventRecordingFailure labeled.StopWatch
EventRecordingSuccess labeled.StopWatch
Expand Down Expand Up @@ -54,17 +58,37 @@ func (r *eventRecorder) sinkEvent(ctx context.Context, event proto.Message) erro
}

func (r *eventRecorder) RecordNodeEvent(ctx context.Context, e *event.NodeExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.NodeExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordTaskEvent(ctx context.Context, e *event.TaskExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.TaskExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordWorkflowEvent(ctx context.Context, e *event.WorkflowExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.WorkflowExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

// If error message too large, truncate to mitigate grpc message size limit. Split the truncated size equally between
// the beginning and the end of the message to capture the most relevant information.
func truncateErrorMessage(err *core.ExecutionError, length int) {
if len(err.Message) > length {
err.Message = fmt.Sprintf("%s%s", err.Message[:length/2], err.Message[(len(err.Message)-length/2):])
}
}

// Construct a new Event Recorder
func NewEventRecorder(eventSink EventSink, scope promutils.Scope) EventRecorder {
recordingScope := scope.NewSubScope("event_recording")
Expand Down
101 changes: 101 additions & 0 deletions events/event_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package events

import (
"context"
"math/rand"
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytepropeller/events/mocks"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/stretchr/testify/assert"
)

var (
workflowEventError = &event.WorkflowExecutionEvent{
OutputResult: &event.WorkflowExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "error",
},
},
}

nodeEventError = &event.NodeExecutionEvent{
OutputResult: &event.NodeExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "error",
},
},
}

taskEventError = &event.TaskExecutionEvent{
OutputResult: &event.TaskExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "error",
},
},
}
)

var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")

func createRandomString(length int) string {
b := make([]rune, length)
for i := range b {
randomIndex := rand.Intn(len(letter)) //nolint - ignore weak random
b[i] = letter[randomIndex]
}
return string(b)
}

func TestRecordEvent(t *testing.T) {
ctx := context.Background()
scope := promutils.NewTestScope()
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey)

eventSink := mocks.NewMockEventSink()
eventRecorder := NewEventRecorder(eventSink, scope)

wfErr := eventRecorder.RecordWorkflowEvent(ctx, wfEvent)
assert.NoError(t, wfErr)

nodeErr := eventRecorder.RecordNodeEvent(ctx, nodeEvent)
assert.NoError(t, nodeErr)

taskErr := eventRecorder.RecordTaskEvent(ctx, taskEvent)
assert.NoError(t, taskErr)
}

func TestRecordErrorEvent(t *testing.T) {
ctx := context.Background()
scope := promutils.NewTestScope()
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey)

eventSink := mocks.NewMockEventSink()
eventRecorder := NewEventRecorder(eventSink, scope)

wfErr := eventRecorder.RecordWorkflowEvent(ctx, workflowEventError)
assert.NoError(t, wfErr)

nodeErr := eventRecorder.RecordNodeEvent(ctx, nodeEventError)
assert.NoError(t, nodeErr)

taskErr := eventRecorder.RecordTaskEvent(ctx, taskEventError)
assert.NoError(t, taskErr)
}

func TestTruncateErrorMessage(t *testing.T) {
length := 100
for i := 0; i <= length*2; i += 5 {
executionError := core.ExecutionError{
Message: createRandomString(i),
}

truncateErrorMessage(&executionError, length)
assert.True(t, len(executionError.Message) <= length)
}
}

0 comments on commit 9147b19

Please sign in to comment.