Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add onRetryCallback callback function #405

Merged
merged 4 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions interceptors/retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var (
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
}),
onRetryCallback: OnRetryCallback(func(ctx context.Context, attempt uint, err error) {
logTrace(ctx, "grpc_retry attempt: %d, backoff for %v", attempt, err)
}),
}
)

Expand All @@ -45,6 +48,9 @@ type BackoffFunc func(attempt uint) time.Duration
// with the next iteration. The context can be used to extract request scoped metadata and context values.
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration

// OnRetryCallback is the type of function called when a retry occurs.
type OnRetryCallback func(ctx context.Context, attempt uint, err error)

// Disable disables the retry behaviour on this call, or this interceptor.
//
// Its semantically the same to `WithMax`
Expand Down Expand Up @@ -75,6 +81,15 @@ func WithBackoffContext(bf BackoffFuncContext) CallOption {
}}
}

// WithOnRetryCallback sets the callback to use when a retry occurs.
//
// By default, when no callback function provided, we will just print a log to trace
func WithOnRetryCallback(fn OnRetryCallback) CallOption {
return CallOption{applyFunc: func(o *options) {
o.onRetryCallback = fn
}}
}

// WithCodes sets which codes should be retried.
//
// Please *use with care*, as you may be retrying non-idempotent calls.
Expand Down Expand Up @@ -105,11 +120,12 @@ func WithPerRetryTimeout(timeout time.Duration) CallOption {
}

type options struct {
max uint
perCallTimeout time.Duration
includeHeader bool
codes []codes.Code
backoffFunc BackoffFuncContext
max uint
perCallTimeout time.Duration
includeHeader bool
codes []codes.Code
backoffFunc BackoffFuncContext
onRetryCallback OnRetryCallback
}

// CallOption is a grpc.CallOption that is local to grpc_retry.
Expand Down
6 changes: 3 additions & 3 deletions interceptors/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor
if lastErr == nil {
return nil
}
logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
callOpts.onRetryCallback(parentCtx, attempt, lastErr)
if isContextError(lastErr) {
if parentCtx.Err() != nil {
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
Expand Down Expand Up @@ -110,8 +110,7 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto
}
return retryingStreamer, nil
}

logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
callOpts.onRetryCallback(parentCtx, attempt, lastErr)
if isContextError(lastErr) {
if parentCtx.Err() != nil {
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
Expand Down Expand Up @@ -189,6 +188,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
if err := waitRetryBackoff(attempt, s.parentCtx, s.callOpts); err != nil {
return err
}
s.callOpts.onRetryCallback(s.parentCtx, attempt, lastErr)
// TODO(bwplotka): Close cancel as it might leak some resources.
callCtx, _ := perCallContext(s.parentCtx, s.callOpts, attempt) //nolint
newStream, err := s.reestablishStreamAndResendBuffer(callCtx)
Expand Down
30 changes: 30 additions & 0 deletions interceptors/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,21 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}

func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() {
retryCallbackCount := 0

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)

require.NoError(s.T(), err, "the third invocation should succeed")
require.NotNil(s.T(), out, "Pong must be not nil")
require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called")
}

func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {
s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) // see retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList)
Expand Down Expand Up @@ -266,6 +281,21 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
}

func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() {
retryCallbackCount := 0

s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
retryCallbackCount++
}),
)

require.NoError(s.T(), err, "establishing the connection must always succeed")
s.assertPingListWasCorrect(stream)
require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called")
}

func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
restarted := s.RestartServer(3 * retryTimeout)
_, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList)
Expand Down