From 68e67c8654a03059eaabd4d90cf5268246c1e6da Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 6 Oct 2022 13:22:17 +0100 Subject: [PATCH] Minor adustments to service.BatchError --- CHANGELOG.md | 4 ++ internal/batch/error.go | 13 ++-- internal/component/input/async_preserver.go | 2 +- internal/component/output/not_batched_test.go | 4 +- internal/transaction/tracked.go | 4 +- public/service/errors.go | 72 ++++++++++++------- public/service/errors_test.go | 30 ++++---- public/service/input.go | 20 +++--- public/service/output.go | 5 +- 9 files changed, 87 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cfbdcf85ac..1dcf15ffc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ All notable changes to this project will be documented in this file. ## 4.9.1 - 2022-10-06 +### Added + +- Go API: A new `BatchError` type added for distinguishing errors of a given batch. + ### Fixed - Rolled back `kafka` input and output underlying sarama client library to fix a regression introduced in 4.9.0 😅 where `invalid configuration (Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time)` errors would prevent consumption under certain configurations. We've decided to roll back rather than upgrade as a breaking API change was introduced that could cause issues for Go API importers (more info here: https://github.com/Shopify/sarama/issues/2358). diff --git a/internal/batch/error.go b/internal/batch/error.go index 6b1b0a71db..be451f470e 100644 --- a/internal/batch/error.go +++ b/internal/batch/error.go @@ -29,12 +29,15 @@ func NewError(msg message.Batch, err error) *Error { } // Failed stores an error state for a particular message of a batch. Returns a -// pointer to the underlying error, allowing with method to be chained. +// pointer to the underlying error, allowing the method to be chained. // // If Failed is not called then all messages are assumed to have failed. If it // is called at least once then all message indexes that aren't explicitly // failed are assumed to have been processed successfully. func (e *Error) Failed(i int, err error) *Error { + if len(e.source) <= i { + return e + } if e.partErrors == nil { e.partErrors = make(map[int]error) } @@ -48,14 +51,6 @@ func (e *Error) IndexedErrors() int { return len(e.partErrors) } -// WalkableError is an interface implemented by batch errors that allows you to -// walk the messages of the batch and dig into the individual errors. -type WalkableError interface { - WalkParts(fn func(int, *message.Part, error) bool) - IndexedErrors() int - error -} - // WalkParts applies a closure to each message that was part of the request that // caused this error. The closure is provided the message part index, a pointer // to the part, and its individual error, which may be nil if the message itself diff --git a/internal/component/input/async_preserver.go b/internal/component/input/async_preserver.go index 98e20f93d5..9298ab9ea5 100644 --- a/internal/component/input/async_preserver.go +++ b/internal/component/input/async_preserver.go @@ -87,7 +87,7 @@ func (p *AsyncPreserver) wrapBatchAckFn(m asyncPreserverResend) (message.Batch, return trackedMsg, func(ctx context.Context, res error) error { if res != nil { resendMsg := m.msg - if walkable, ok := res.(batch.WalkableError); ok && walkable.IndexedErrors() < m.msg.Len() { + if walkable, ok := res.(*batch.Error); ok && walkable.IndexedErrors() < m.msg.Len() { resendMsg = message.QuickBatch(nil) walkable.WalkParts(func(i int, p *message.Part, e error) bool { if e == nil { diff --git a/internal/component/output/not_batched_test.go b/internal/component/output/not_batched_test.go index 47bdbfb04c..16934e2f38 100644 --- a/internal/component/output/not_batched_test.go +++ b/internal/component/output/not_batched_test.go @@ -223,7 +223,7 @@ func TestNotBatchedBreakOutMessagesErrors(t *testing.T) { err := res require.Error(t, err) - walkable, ok := err.(batch.WalkableError) + walkable, ok := err.(*batch.Error) require.True(t, ok) errs := map[int]string{} @@ -282,7 +282,7 @@ func TestNotBatchedBreakOutMessagesErrorsAsync(t *testing.T) { err := res require.Error(t, err) - walkable, ok := err.(batch.WalkableError) + walkable, ok := err.(*batch.Error) require.True(t, ok) errs := map[int]string{} diff --git a/internal/transaction/tracked.go b/internal/transaction/tracked.go index 2d65dbd8c4..3ab0c06cd3 100644 --- a/internal/transaction/tracked.go +++ b/internal/transaction/tracked.go @@ -36,7 +36,7 @@ func (t *Tracked) Message() message.Batch { return t.msg } -func (t *Tracked) getResFromGroup(walkable batch.WalkableError) error { +func (t *Tracked) getResFromGroup(walkable *batch.Error) error { remainingIndexes := make(map[int]struct{}, t.msg.Len()) for i := 0; i < t.msg.Len(); i++ { remainingIndexes[i] = struct{}{} @@ -68,7 +68,7 @@ func (t *Tracked) getResFromGroup(walkable batch.WalkableError) error { func (t *Tracked) resFromError(err error) error { if err != nil { - if walkable, ok := err.(batch.WalkableError); ok { + if walkable, ok := err.(*batch.Error); ok { err = t.getResFromGroup(walkable) } } diff --git a/public/service/errors.go b/public/service/errors.go index 842f90117f..ee178ebc1b 100644 --- a/public/service/errors.go +++ b/public/service/errors.go @@ -1,48 +1,52 @@ package service import ( + "errors" + "github.com/benthosdev/benthos/v4/internal/batch" "github.com/benthosdev/benthos/v4/internal/message" ) -// WalkableError groups the errors that were encountered while processing a +// BatchError groups the errors that were encountered while processing a // collection (usually a batch) of messages and provides methods to iterate // over these errors. -type WalkableError struct { - wrapped batch.WalkableError +type BatchError struct { + wrapped *batch.Error } -// MockWalkableError creates a WalkableError that can be used for testing -// purposes. The list of message errors can contain nil values to skip over -// messages that shouldn't have errors. Additionally, the list of errors does -// not have to be the same length as the number of messages in the batch. If -// there are more errors than batch messages, then the extra errors are -// discarded. -func MockWalkableError(b MessageBatch, headline error, messageErrors ...error) *WalkableError { +// NewBatchError creates a BatchError that can be returned by batched outputs. +// The advantage of doing so is that nacks and retries can potentially be +// granularly dealt out in cases where only a subset of the batch failed. +// +// A headline error must be supplied which will be exposed when upstream +// components do not support granular batch errors. +func NewBatchError(b MessageBatch, headline error) *BatchError { ibatch := make(message.Batch, len(b)) for i, m := range b { ibatch[i] = m.part } batchErr := batch.NewError(ibatch, headline) - for i, merr := range messageErrors { - if i >= len(b) { - break - } - if merr != nil { - batchErr.Failed(i, merr) - } - } + return &BatchError{wrapped: batchErr} +} - return &WalkableError{wrapped: batchErr} +// Failed stores an error state for a particular message of a batch. Returns a +// pointer to the underlying error, allowing the method to be chained. +// +// If Failed is not called then all messages are assumed to have failed. If it +// is called at least once then all message indexes that aren't explicitly +// failed are assumed to have been processed successfully. +func (err *BatchError) Failed(i int, merr error) *BatchError { + _ = err.wrapped.Failed(i, merr) + return err } // WalkMessages applies a closure to each message that was part of the request // that caused this error. The closure is provided the message index, a pointer // to the message, and its individual error, which may be nil if the message -// itself was processed successfully. The closure returns a bool which indicates -// whether the iteration should be continued. -func (err *WalkableError) WalkMessages(fn func(int, *Message, error) bool) { +// itself was processed successfully. The closure should return a bool which +// indicates whether the iteration should be continued. +func (err *BatchError) WalkMessages(fn func(int, *Message, error) bool) { err.wrapped.WalkParts(func(i int, p *message.Part, err error) bool { return fn(i, &Message{part: p}, err) }) @@ -50,11 +54,31 @@ func (err *WalkableError) WalkMessages(fn func(int, *Message, error) bool) { // IndexedErrors returns the number of indexed errors that have been registered // within a walkable error. -func (err *WalkableError) IndexedErrors() int { +func (err *BatchError) IndexedErrors() int { return err.wrapped.IndexedErrors() } // Error returns the underlying error message -func (err *WalkableError) Error() string { +func (err *BatchError) Error() string { return err.wrapped.Error() } + +// If the provided error is not nil and can be cast to an internal batch error +// we return a public batch error. +func toPublicBatchError(err error) error { + var bErr *batch.Error + if err != nil && errors.As(err, &bErr) { + err = &BatchError{wrapped: bErr} + } + return err +} + +// If the provided error is not nil and can be cast to a public batch error we +// return the internal batch error. +func fromPublicBatchError(err error) error { + var bErr *BatchError + if err != nil && errors.As(err, &bErr) { + err = bErr.wrapped + } + return err +} diff --git a/public/service/errors_test.go b/public/service/errors_test.go index aeb7f1d8ab..0c9d7896c5 100644 --- a/public/service/errors_test.go +++ b/public/service/errors_test.go @@ -16,11 +16,10 @@ func TestMockWalkableError(t *testing.T) { } batchError := errors.New("simulated error") - err := MockWalkableError(batch, batchError, - errors.New("a error"), - errors.New("b error"), - errors.New("c error"), - ) + err := NewBatchError(batch, batchError). + Failed(0, errors.New("a error")). + Failed(1, errors.New("b error")). + Failed(2, errors.New("c error")) require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch") require.Equal(t, err.Error(), batchError.Error(), "headline error is not propagated") @@ -40,14 +39,13 @@ func TestMockWalkableError_ExcessErrors(t *testing.T) { } batchError := errors.New("simulated error") - err := MockWalkableError(batch, batchError, - errors.New("a error"), - errors.New("b error"), - errors.New("c error"), - errors.New("d error"), - ) + err := NewBatchError(batch, batchError). + Failed(0, errors.New("a error")). + Failed(1, errors.New("b error")). + Failed(2, errors.New("c error")). + Failed(3, errors.New("d error")) - require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch") + require.Equal(t, len(batch), err.IndexedErrors(), "indexed errors did not match size of batch") } func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) { @@ -58,11 +56,9 @@ func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) { } batchError := errors.New("simulated error") - err := MockWalkableError(batch, batchError, - errors.New("a error"), - nil, - errors.New("c error"), - ) + err := NewBatchError(batch, batchError). + Failed(0, errors.New("a error")). + Failed(2, errors.New("c error")) require.Equal(t, err.IndexedErrors(), 2, "indexed errors did not match size of batch") } diff --git a/public/service/input.go b/public/service/input.go index 7270f531fd..f1ffc9850a 100644 --- a/public/service/input.go +++ b/public/service/input.go @@ -4,7 +4,6 @@ import ( "context" "errors" - ibatch "github.com/benthosdev/benthos/v4/internal/batch" "github.com/benthosdev/benthos/v4/internal/component" "github.com/benthosdev/benthos/v4/internal/component/input" "github.com/benthosdev/benthos/v4/internal/message" @@ -171,13 +170,8 @@ func (a *airGapBatchReader) ReadBatch(ctx context.Context) (message.Batch, input mBatch[i] = p.part } return mBatch, func(c context.Context, r error) error { - werr, isWalkableErr := r.(ibatch.WalkableError) - wrappedResult := r - if isWalkableErr { - wrappedResult = &WalkableError{wrapped: werr} - } - - return ackFn(c, wrappedResult) + r = toPublicBatchError(r) + return ackFn(c, r) }, nil } @@ -220,7 +214,10 @@ func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, e b = append(b, newMessageFromPart(part)) return nil }) - return b, tran.Ack, nil + return b, func(c context.Context, r error) error { + r = fromPublicBatchError(r) + return tran.Ack(c, r) + }, nil } //------------------------------------------------------------------------------ @@ -257,7 +254,10 @@ func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, erro b = append(b, newMessageFromPart(part)) return nil }) - return b, tran.Ack, nil + return b, func(c context.Context, r error) error { + r = fromPublicBatchError(r) + return tran.Ack(c, r) + }, nil } // Close the input. diff --git a/public/service/output.go b/public/service/output.go index cbf12bc725..7331aa9fcb 100644 --- a/public/service/output.go +++ b/public/service/output.go @@ -121,6 +121,7 @@ func (a *airGapBatchWriter) WriteBatch(ctx context.Context, msg message.Batch) e if err != nil && errors.Is(err, ErrNotConnected) { err = component.ErrNotConnected } + err = fromPublicBatchError(err) return err } @@ -153,7 +154,7 @@ func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error { for i, m := range b { payload[i] = m.part } - return o.writeMsg(ctx, payload) + return toPublicBatchError(o.writeMsg(ctx, payload)) } func (o *ResourceOutput) writeMsg(ctx context.Context, payload message.Batch) error { @@ -233,7 +234,7 @@ func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error { select { case res := <-resChan: - return res + return toPublicBatchError(res) case <-ctx.Done(): return ctx.Err() }