From 67996d53dbe3276d732b3ff2268e0f2251b14984 Mon Sep 17 00:00:00 2001 From: Georges Haidar Date: Fri, 30 Sep 2022 17:27:32 +0100 Subject: [PATCH] expose service.WalkableError public API This new API allows developers to inspect the individual message errors contained within a batch error. Some inputs can benefit from access to this granular error info to build better ack/nack handling around partial failure scenarios. --- public/service/errors.go | 60 +++++++++++++++++++++++++++++++ public/service/errors_test.go | 68 +++++++++++++++++++++++++++++++++++ public/service/input.go | 9 ++++- 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 public/service/errors.go create mode 100644 public/service/errors_test.go diff --git a/public/service/errors.go b/public/service/errors.go new file mode 100644 index 0000000000..842f90117f --- /dev/null +++ b/public/service/errors.go @@ -0,0 +1,60 @@ +package service + +import ( + "github.com/benthosdev/benthos/v4/internal/batch" + "github.com/benthosdev/benthos/v4/internal/message" +) + +// WalkableError groups the errors that were encountered while processing a +// collection (usually a batch) of messages and provides methods to iterate +// over these errors. +type WalkableError struct { + wrapped batch.WalkableError +} + +// MockWalkableError creates a WalkableError that can be used for testing +// purposes. The list of message errors can contain nil values to skip over +// messages that shouldn't have errors. Additionally, the list of errors does +// not have to be the same length as the number of messages in the batch. If +// there are more errors than batch messages, then the extra errors are +// discarded. +func MockWalkableError(b MessageBatch, headline error, messageErrors ...error) *WalkableError { + ibatch := make(message.Batch, len(b)) + for i, m := range b { + ibatch[i] = m.part + } + + batchErr := batch.NewError(ibatch, headline) + for i, merr := range messageErrors { + if i >= len(b) { + break + } + if merr != nil { + batchErr.Failed(i, merr) + } + } + + return &WalkableError{wrapped: batchErr} +} + +// WalkMessages applies a closure to each message that was part of the request +// that caused this error. The closure is provided the message index, a pointer +// to the message, and its individual error, which may be nil if the message +// itself was processed successfully. The closure returns a bool which indicates +// whether the iteration should be continued. +func (err *WalkableError) WalkMessages(fn func(int, *Message, error) bool) { + err.wrapped.WalkParts(func(i int, p *message.Part, err error) bool { + return fn(i, &Message{part: p}, err) + }) +} + +// IndexedErrors returns the number of indexed errors that have been registered +// within a walkable error. +func (err *WalkableError) IndexedErrors() int { + return err.wrapped.IndexedErrors() +} + +// Error returns the underlying error message +func (err *WalkableError) Error() string { + return err.wrapped.Error() +} diff --git a/public/service/errors_test.go b/public/service/errors_test.go new file mode 100644 index 0000000000..aeb7f1d8ab --- /dev/null +++ b/public/service/errors_test.go @@ -0,0 +1,68 @@ +package service + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMockWalkableError(t *testing.T) { + batch := MessageBatch{ + NewMessage([]byte("a")), + NewMessage([]byte("b")), + NewMessage([]byte("c")), + } + + batchError := errors.New("simulated error") + err := MockWalkableError(batch, batchError, + errors.New("a error"), + errors.New("b error"), + errors.New("c error"), + ) + + require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch") + require.Equal(t, err.Error(), batchError.Error(), "headline error is not propagated") + err.WalkMessages(func(i int, m *Message, err error) bool { + bs, berr := m.AsBytes() + require.NoErrorf(t, berr, "could not get bytes from message at %d", i) + require.Equal(t, err.Error(), fmt.Sprintf("%s error", bs)) + return true + }) +} + +func TestMockWalkableError_ExcessErrors(t *testing.T) { + batch := MessageBatch{ + NewMessage([]byte("a")), + NewMessage([]byte("b")), + NewMessage([]byte("c")), + } + + batchError := errors.New("simulated error") + err := MockWalkableError(batch, batchError, + errors.New("a error"), + errors.New("b error"), + errors.New("c error"), + errors.New("d error"), + ) + + require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch") +} + +func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) { + batch := MessageBatch{ + NewMessage([]byte("a")), + NewMessage([]byte("b")), + NewMessage([]byte("c")), + } + + batchError := errors.New("simulated error") + err := MockWalkableError(batch, batchError, + errors.New("a error"), + nil, + errors.New("c error"), + ) + + require.Equal(t, err.IndexedErrors(), 2, "indexed errors did not match size of batch") +} diff --git a/public/service/input.go b/public/service/input.go index 4bffab0a11..7270f531fd 100644 --- a/public/service/input.go +++ b/public/service/input.go @@ -4,6 +4,7 @@ import ( "context" "errors" + ibatch "github.com/benthosdev/benthos/v4/internal/batch" "github.com/benthosdev/benthos/v4/internal/component" "github.com/benthosdev/benthos/v4/internal/component/input" "github.com/benthosdev/benthos/v4/internal/message" @@ -170,7 +171,13 @@ func (a *airGapBatchReader) ReadBatch(ctx context.Context) (message.Batch, input mBatch[i] = p.part } return mBatch, func(c context.Context, r error) error { - return ackFn(c, r) + werr, isWalkableErr := r.(ibatch.WalkableError) + wrappedResult := r + if isWalkableErr { + wrappedResult = &WalkableError{wrapped: werr} + } + + return ackFn(c, wrappedResult) }, nil }