Skip to content

Commit

Permalink
Minor adustments to service.BatchError
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 6, 2022
1 parent 358df50 commit 68e67c8
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 67 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ All notable changes to this project will be documented in this file.

## 4.9.1 - 2022-10-06

### Added

- Go API: A new `BatchError` type added for distinguishing errors of a given batch.

### Fixed

- Rolled back `kafka` input and output underlying sarama client library to fix a regression introduced in 4.9.0 😅 where `invalid configuration (Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time)` errors would prevent consumption under certain configurations. We've decided to roll back rather than upgrade as a breaking API change was introduced that could cause issues for Go API importers (more info here: https://github.com/Shopify/sarama/issues/2358).
Expand Down
13 changes: 4 additions & 9 deletions internal/batch/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ func NewError(msg message.Batch, err error) *Error {
}

// Failed stores an error state for a particular message of a batch. Returns a
// pointer to the underlying error, allowing with method to be chained.
// pointer to the underlying error, allowing the method to be chained.
//
// If Failed is not called then all messages are assumed to have failed. If it
// is called at least once then all message indexes that aren't explicitly
// failed are assumed to have been processed successfully.
func (e *Error) Failed(i int, err error) *Error {
if len(e.source) <= i {
return e
}
if e.partErrors == nil {
e.partErrors = make(map[int]error)
}
Expand All @@ -48,14 +51,6 @@ func (e *Error) IndexedErrors() int {
return len(e.partErrors)
}

// WalkableError is an interface implemented by batch errors that allows you to
// walk the messages of the batch and dig into the individual errors.
type WalkableError interface {
WalkParts(fn func(int, *message.Part, error) bool)
IndexedErrors() int
error
}

// WalkParts applies a closure to each message that was part of the request that
// caused this error. The closure is provided the message part index, a pointer
// to the part, and its individual error, which may be nil if the message itself
Expand Down
2 changes: 1 addition & 1 deletion internal/component/input/async_preserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (p *AsyncPreserver) wrapBatchAckFn(m asyncPreserverResend) (message.Batch,
return trackedMsg, func(ctx context.Context, res error) error {
if res != nil {
resendMsg := m.msg
if walkable, ok := res.(batch.WalkableError); ok && walkable.IndexedErrors() < m.msg.Len() {
if walkable, ok := res.(*batch.Error); ok && walkable.IndexedErrors() < m.msg.Len() {
resendMsg = message.QuickBatch(nil)
walkable.WalkParts(func(i int, p *message.Part, e error) bool {
if e == nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/component/output/not_batched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestNotBatchedBreakOutMessagesErrors(t *testing.T) {
err := res
require.Error(t, err)

walkable, ok := err.(batch.WalkableError)
walkable, ok := err.(*batch.Error)
require.True(t, ok)

errs := map[int]string{}
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestNotBatchedBreakOutMessagesErrorsAsync(t *testing.T) {
err := res
require.Error(t, err)

walkable, ok := err.(batch.WalkableError)
walkable, ok := err.(*batch.Error)
require.True(t, ok)

errs := map[int]string{}
Expand Down
4 changes: 2 additions & 2 deletions internal/transaction/tracked.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (t *Tracked) Message() message.Batch {
return t.msg
}

func (t *Tracked) getResFromGroup(walkable batch.WalkableError) error {
func (t *Tracked) getResFromGroup(walkable *batch.Error) error {
remainingIndexes := make(map[int]struct{}, t.msg.Len())
for i := 0; i < t.msg.Len(); i++ {
remainingIndexes[i] = struct{}{}
Expand Down Expand Up @@ -68,7 +68,7 @@ func (t *Tracked) getResFromGroup(walkable batch.WalkableError) error {

func (t *Tracked) resFromError(err error) error {
if err != nil {
if walkable, ok := err.(batch.WalkableError); ok {
if walkable, ok := err.(*batch.Error); ok {
err = t.getResFromGroup(walkable)
}
}
Expand Down
72 changes: 48 additions & 24 deletions public/service/errors.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,84 @@
package service

import (
"errors"

"github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/message"
)

// WalkableError groups the errors that were encountered while processing a
// BatchError groups the errors that were encountered while processing a
// collection (usually a batch) of messages and provides methods to iterate
// over these errors.
type WalkableError struct {
wrapped batch.WalkableError
type BatchError struct {
wrapped *batch.Error
}

// MockWalkableError creates a WalkableError that can be used for testing
// purposes. The list of message errors can contain nil values to skip over
// messages that shouldn't have errors. Additionally, the list of errors does
// not have to be the same length as the number of messages in the batch. If
// there are more errors than batch messages, then the extra errors are
// discarded.
func MockWalkableError(b MessageBatch, headline error, messageErrors ...error) *WalkableError {
// NewBatchError creates a BatchError that can be returned by batched outputs.
// The advantage of doing so is that nacks and retries can potentially be
// granularly dealt out in cases where only a subset of the batch failed.
//
// A headline error must be supplied which will be exposed when upstream
// components do not support granular batch errors.
func NewBatchError(b MessageBatch, headline error) *BatchError {
ibatch := make(message.Batch, len(b))
for i, m := range b {
ibatch[i] = m.part
}

batchErr := batch.NewError(ibatch, headline)
for i, merr := range messageErrors {
if i >= len(b) {
break
}
if merr != nil {
batchErr.Failed(i, merr)
}
}
return &BatchError{wrapped: batchErr}
}

return &WalkableError{wrapped: batchErr}
// Failed stores an error state for a particular message of a batch. Returns a
// pointer to the underlying error, allowing the method to be chained.
//
// If Failed is not called then all messages are assumed to have failed. If it
// is called at least once then all message indexes that aren't explicitly
// failed are assumed to have been processed successfully.
func (err *BatchError) Failed(i int, merr error) *BatchError {
_ = err.wrapped.Failed(i, merr)
return err
}

// WalkMessages applies a closure to each message that was part of the request
// that caused this error. The closure is provided the message index, a pointer
// to the message, and its individual error, which may be nil if the message
// itself was processed successfully. The closure returns a bool which indicates
// whether the iteration should be continued.
func (err *WalkableError) WalkMessages(fn func(int, *Message, error) bool) {
// itself was processed successfully. The closure should return a bool which
// indicates whether the iteration should be continued.
func (err *BatchError) WalkMessages(fn func(int, *Message, error) bool) {
err.wrapped.WalkParts(func(i int, p *message.Part, err error) bool {
return fn(i, &Message{part: p}, err)
})
}

// IndexedErrors returns the number of indexed errors that have been registered
// within a walkable error.
func (err *WalkableError) IndexedErrors() int {
func (err *BatchError) IndexedErrors() int {
return err.wrapped.IndexedErrors()
}

// Error returns the underlying error message
func (err *WalkableError) Error() string {
func (err *BatchError) Error() string {
return err.wrapped.Error()
}

// If the provided error is not nil and can be cast to an internal batch error
// we return a public batch error.
func toPublicBatchError(err error) error {
var bErr *batch.Error
if err != nil && errors.As(err, &bErr) {
err = &BatchError{wrapped: bErr}
}
return err
}

// If the provided error is not nil and can be cast to a public batch error we
// return the internal batch error.
func fromPublicBatchError(err error) error {
var bErr *BatchError
if err != nil && errors.As(err, &bErr) {
err = bErr.wrapped
}
return err
}
30 changes: 13 additions & 17 deletions public/service/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ func TestMockWalkableError(t *testing.T) {
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
errors.New("b error"),
errors.New("c error"),
)
err := NewBatchError(batch, batchError).
Failed(0, errors.New("a error")).
Failed(1, errors.New("b error")).
Failed(2, errors.New("c error"))

require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch")
require.Equal(t, err.Error(), batchError.Error(), "headline error is not propagated")
Expand All @@ -40,14 +39,13 @@ func TestMockWalkableError_ExcessErrors(t *testing.T) {
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
errors.New("b error"),
errors.New("c error"),
errors.New("d error"),
)
err := NewBatchError(batch, batchError).
Failed(0, errors.New("a error")).
Failed(1, errors.New("b error")).
Failed(2, errors.New("c error")).
Failed(3, errors.New("d error"))

require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch")
require.Equal(t, len(batch), err.IndexedErrors(), "indexed errors did not match size of batch")
}

func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) {
Expand All @@ -58,11 +56,9 @@ func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) {
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
nil,
errors.New("c error"),
)
err := NewBatchError(batch, batchError).
Failed(0, errors.New("a error")).
Failed(2, errors.New("c error"))

require.Equal(t, err.IndexedErrors(), 2, "indexed errors did not match size of batch")
}
20 changes: 10 additions & 10 deletions public/service/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"

ibatch "github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/message"
Expand Down Expand Up @@ -171,13 +170,8 @@ func (a *airGapBatchReader) ReadBatch(ctx context.Context) (message.Batch, input
mBatch[i] = p.part
}
return mBatch, func(c context.Context, r error) error {
werr, isWalkableErr := r.(ibatch.WalkableError)
wrappedResult := r
if isWalkableErr {
wrappedResult = &WalkableError{wrapped: werr}
}

return ackFn(c, wrappedResult)
r = toPublicBatchError(r)
return ackFn(c, r)
}, nil
}

Expand Down Expand Up @@ -220,7 +214,10 @@ func (r *ResourceInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, e
b = append(b, newMessageFromPart(part))
return nil
})
return b, tran.Ack, nil
return b, func(c context.Context, r error) error {
r = fromPublicBatchError(r)
return tran.Ack(c, r)
}, nil
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -257,7 +254,10 @@ func (o *OwnedInput) ReadBatch(ctx context.Context) (MessageBatch, AckFunc, erro
b = append(b, newMessageFromPart(part))
return nil
})
return b, tran.Ack, nil
return b, func(c context.Context, r error) error {
r = fromPublicBatchError(r)
return tran.Ack(c, r)
}, nil
}

// Close the input.
Expand Down
5 changes: 3 additions & 2 deletions public/service/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (a *airGapBatchWriter) WriteBatch(ctx context.Context, msg message.Batch) e
if err != nil && errors.Is(err, ErrNotConnected) {
err = component.ErrNotConnected
}
err = fromPublicBatchError(err)
return err
}

Expand Down Expand Up @@ -153,7 +154,7 @@ func (o *ResourceOutput) WriteBatch(ctx context.Context, b MessageBatch) error {
for i, m := range b {
payload[i] = m.part
}
return o.writeMsg(ctx, payload)
return toPublicBatchError(o.writeMsg(ctx, payload))
}

func (o *ResourceOutput) writeMsg(ctx context.Context, payload message.Batch) error {
Expand Down Expand Up @@ -233,7 +234,7 @@ func (o *OwnedOutput) WriteBatch(ctx context.Context, b MessageBatch) error {

select {
case res := <-resChan:
return res
return toPublicBatchError(res)
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit 68e67c8

Please sign in to comment.