Skip to content

Commit

Permalink
Merge pull request #1485 from disintegrator/walkable-error-api
Browse files Browse the repository at this point in the history
expose `service.WalkableError` public API
  • Loading branch information
Jeffail authored Oct 6, 2022
2 parents 9f54828 + 67996d5 commit 358df50
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
60 changes: 60 additions & 0 deletions public/service/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package service

import (
"github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/message"
)

// WalkableError groups the errors that were encountered while processing a
// collection (usually a batch) of messages and provides methods to iterate
// over these errors.
type WalkableError struct {
wrapped batch.WalkableError
}

// MockWalkableError creates a WalkableError that can be used for testing
// purposes. The list of message errors can contain nil values to skip over
// messages that shouldn't have errors. Additionally, the list of errors does
// not have to be the same length as the number of messages in the batch. If
// there are more errors than batch messages, then the extra errors are
// discarded.
func MockWalkableError(b MessageBatch, headline error, messageErrors ...error) *WalkableError {
ibatch := make(message.Batch, len(b))
for i, m := range b {
ibatch[i] = m.part
}

batchErr := batch.NewError(ibatch, headline)
for i, merr := range messageErrors {
if i >= len(b) {
break
}
if merr != nil {
batchErr.Failed(i, merr)
}
}

return &WalkableError{wrapped: batchErr}
}

// WalkMessages applies a closure to each message that was part of the request
// that caused this error. The closure is provided the message index, a pointer
// to the message, and its individual error, which may be nil if the message
// itself was processed successfully. The closure returns a bool which indicates
// whether the iteration should be continued.
func (err *WalkableError) WalkMessages(fn func(int, *Message, error) bool) {
err.wrapped.WalkParts(func(i int, p *message.Part, err error) bool {
return fn(i, &Message{part: p}, err)
})
}

// IndexedErrors returns the number of indexed errors that have been registered
// within a walkable error.
func (err *WalkableError) IndexedErrors() int {
return err.wrapped.IndexedErrors()
}

// Error returns the underlying error message
func (err *WalkableError) Error() string {
return err.wrapped.Error()
}
68 changes: 68 additions & 0 deletions public/service/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package service

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestMockWalkableError(t *testing.T) {
batch := MessageBatch{
NewMessage([]byte("a")),
NewMessage([]byte("b")),
NewMessage([]byte("c")),
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
errors.New("b error"),
errors.New("c error"),
)

require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch")
require.Equal(t, err.Error(), batchError.Error(), "headline error is not propagated")
err.WalkMessages(func(i int, m *Message, err error) bool {
bs, berr := m.AsBytes()
require.NoErrorf(t, berr, "could not get bytes from message at %d", i)
require.Equal(t, err.Error(), fmt.Sprintf("%s error", bs))
return true
})
}

func TestMockWalkableError_ExcessErrors(t *testing.T) {
batch := MessageBatch{
NewMessage([]byte("a")),
NewMessage([]byte("b")),
NewMessage([]byte("c")),
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
errors.New("b error"),
errors.New("c error"),
errors.New("d error"),
)

require.Equal(t, err.IndexedErrors(), len(batch), "indexed errors did not match size of batch")
}

func TestMockWalkableError_OmitSuccessfulMessages(t *testing.T) {
batch := MessageBatch{
NewMessage([]byte("a")),
NewMessage([]byte("b")),
NewMessage([]byte("c")),
}

batchError := errors.New("simulated error")
err := MockWalkableError(batch, batchError,
errors.New("a error"),
nil,
errors.New("c error"),
)

require.Equal(t, err.IndexedErrors(), 2, "indexed errors did not match size of batch")
}
9 changes: 8 additions & 1 deletion public/service/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

ibatch "github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/message"
Expand Down Expand Up @@ -170,7 +171,13 @@ func (a *airGapBatchReader) ReadBatch(ctx context.Context) (message.Batch, input
mBatch[i] = p.part
}
return mBatch, func(c context.Context, r error) error {
return ackFn(c, r)
werr, isWalkableErr := r.(ibatch.WalkableError)
wrappedResult := r
if isWalkableErr {
wrappedResult = &WalkableError{wrapped: werr}
}

return ackFn(c, wrappedResult)
}, nil
}

Expand Down

0 comments on commit 358df50

Please sign in to comment.