From d3e950f397e2985d16c78f014b536b059190f8c2 Mon Sep 17 00:00:00 2001 From: Alex Simenduev Date: Thu, 1 Apr 2021 21:53:04 +0300 Subject: [PATCH 1/4] add onRetryCallback callback function --- interceptors/retry/options.go | 39 ++++++++++++++++++++++++++++---- interceptors/retry/retry.go | 4 +++- interceptors/retry/retry_test.go | 26 +++++++++++++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/interceptors/retry/options.go b/interceptors/retry/options.go index baa4e06f7..c39291dcd 100644 --- a/interceptors/retry/options.go +++ b/interceptors/retry/options.go @@ -26,6 +26,9 @@ var ( backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration { return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt) }), + onRetryCallback: OnRetryCallbackContext(func(ctx context.Context, attempt uint, err error) { + // By default we don't have any callback logic to execute + }), } ) @@ -45,6 +48,15 @@ type BackoffFunc func(attempt uint) time.Duration // with the next iteration. The context can be used to extract request scoped metadata and context values. type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration +// OnRetryCallback called in retry attempts flow, which can be used to add additiional logic when retry occurs +// +type OnRetryCallback func(attempt uint, err error) + +// OnRetryCallback called in retry attempts flow, which can be used to add additiional logic when retry occurs +// +// The context can be used to extract request scoped metadata and context values. +type OnRetryCallbackContext func(ctx context.Context, attempt uint, err error) + // Disable disables the retry behaviour on this call, or this interceptor. // // Its semantically the same to `WithMax` @@ -75,6 +87,22 @@ func WithBackoffContext(bf BackoffFuncContext) CallOption { }} } +// WithOnRetryCallback sets the `OnRetryCallback` used to add additional logic when retry occurs. +func WithOnRetryCallback(fn OnRetryCallback) CallOption { + return CallOption{applyFunc: func(o *options) { + o.onRetryCallback = OnRetryCallbackContext(func(ctx context.Context, attempt uint, err error) { + fn(attempt, err) + }) + }} +} + +// WithOnRetryCallbackContext sets the `OnRetryCallbackContext` used to add additional logic when retry occurs. +func WithOnRetryCallbackContext(fn OnRetryCallbackContext) CallOption { + return CallOption{applyFunc: func(o *options) { + o.onRetryCallback = fn + }} +} + // WithCodes sets which codes should be retried. // // Please *use with care*, as you may be retrying non-idempotent calls. @@ -105,11 +133,12 @@ func WithPerRetryTimeout(timeout time.Duration) CallOption { } type options struct { - max uint - perCallTimeout time.Duration - includeHeader bool - codes []codes.Code - backoffFunc BackoffFuncContext + max uint + perCallTimeout time.Duration + includeHeader bool + codes []codes.Code + backoffFunc BackoffFuncContext + onRetryCallback OnRetryCallbackContext } // CallOption is a grpc.CallOption that is local to grpc_retry. diff --git a/interceptors/retry/retry.go b/interceptors/retry/retry.go index 9f98afc27..e142589b4 100644 --- a/interceptors/retry/retry.go +++ b/interceptors/retry/retry.go @@ -48,6 +48,7 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor if lastErr == nil { return nil } + callOpts.onRetryCallback(callCtx, attempt, lastErr) logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { @@ -110,7 +111,7 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto } return retryingStreamer, nil } - + callOpts.onRetryCallback(callCtx, attempt, lastErr) logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { @@ -191,6 +192,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { } // TODO(bwplotka): Close cancel as it might leak some resources. callCtx, _ := perCallContext(s.parentCtx, s.callOpts, attempt) //nolint + s.callOpts.onRetryCallback(callCtx, attempt, lastErr) newStream, err := s.reestablishStreamAndResendBuffer(callCtx) if err != nil { // Retry dial and transport errors of establishing stream as grpc doesn't retry. diff --git a/interceptors/retry/retry_test.go b/interceptors/retry/retry_test.go index d3372f7f8..69093c7de 100644 --- a/interceptors/retry/retry_test.go +++ b/interceptors/retry/retry_test.go @@ -215,6 +215,19 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() { require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class") } +func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() { + retryCallbackCount := 0 + + s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors + out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithOnRetryCallback(func(attempt uint, err error) { + retryCallbackCount++ + })) + + require.NoError(s.T(), err, "the third invocation should succeed") + require.NotNil(s.T(), out, "Pong must be not nil") + require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called") +} + func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() { s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) // see retriable_errors stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList) @@ -266,6 +279,19 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() { require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class") } +func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() { + retryCallbackCount := 0 + + s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithOnRetryCallback(func(attempt uint, err error) { + retryCallbackCount++ + })) + + require.NoError(s.T(), err, "establishing the connection must always succeed") + s.assertPingListWasCorrect(stream) + require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called") +} + func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() { restarted := s.RestartServer(3 * retryTimeout) _, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList) From b8a9cbc928ef337ba2e93eb76afa960aee61b024 Mon Sep 17 00:00:00 2001 From: Alex Simenduev Date: Sat, 3 Apr 2021 15:27:40 +0300 Subject: [PATCH 2/4] use only one OnRetryCallback function, instead of one with and one without context --- interceptors/retry/options.go | 27 +++++++-------------------- interceptors/retry/retry_test.go | 16 ++++++++++------ 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/interceptors/retry/options.go b/interceptors/retry/options.go index c39291dcd..5d033879b 100644 --- a/interceptors/retry/options.go +++ b/interceptors/retry/options.go @@ -26,7 +26,7 @@ var ( backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration { return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt) }), - onRetryCallback: OnRetryCallbackContext(func(ctx context.Context, attempt uint, err error) { + onRetryCallback: OnRetryCallback(func(ctx context.Context, attempt uint, err error) { // By default we don't have any callback logic to execute }), } @@ -48,14 +48,8 @@ type BackoffFunc func(attempt uint) time.Duration // with the next iteration. The context can be used to extract request scoped metadata and context values. type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration -// OnRetryCallback called in retry attempts flow, which can be used to add additiional logic when retry occurs -// -type OnRetryCallback func(attempt uint, err error) - -// OnRetryCallback called in retry attempts flow, which can be used to add additiional logic when retry occurs -// -// The context can be used to extract request scoped metadata and context values. -type OnRetryCallbackContext func(ctx context.Context, attempt uint, err error) +// OnRetryCallback is the type of function called when a retry occurs. +type OnRetryCallback func(ctx context.Context, attempt uint, err error) // Disable disables the retry behaviour on this call, or this interceptor. // @@ -87,17 +81,10 @@ func WithBackoffContext(bf BackoffFuncContext) CallOption { }} } -// WithOnRetryCallback sets the `OnRetryCallback` used to add additional logic when retry occurs. +// WithOnRetryCallback sets the callback to use when a retry occurs. +// +// By default, no action is performed on retry. func WithOnRetryCallback(fn OnRetryCallback) CallOption { - return CallOption{applyFunc: func(o *options) { - o.onRetryCallback = OnRetryCallbackContext(func(ctx context.Context, attempt uint, err error) { - fn(attempt, err) - }) - }} -} - -// WithOnRetryCallbackContext sets the `OnRetryCallbackContext` used to add additional logic when retry occurs. -func WithOnRetryCallbackContext(fn OnRetryCallbackContext) CallOption { return CallOption{applyFunc: func(o *options) { o.onRetryCallback = fn }} @@ -138,7 +125,7 @@ type options struct { includeHeader bool codes []codes.Code backoffFunc BackoffFuncContext - onRetryCallback OnRetryCallbackContext + onRetryCallback OnRetryCallback } // CallOption is a grpc.CallOption that is local to grpc_retry. diff --git a/interceptors/retry/retry_test.go b/interceptors/retry/retry_test.go index 69093c7de..c5501b831 100644 --- a/interceptors/retry/retry_test.go +++ b/interceptors/retry/retry_test.go @@ -219,9 +219,11 @@ func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() { retryCallbackCount := 0 s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors - out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, retry.WithOnRetryCallback(func(attempt uint, err error) { - retryCallbackCount++ - })) + out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing, + retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + retryCallbackCount++ + }), + ) require.NoError(s.T(), err, "the third invocation should succeed") require.NotNil(s.T(), out, "Pong must be not nil") @@ -283,9 +285,11 @@ func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() { retryCallbackCount := 0 s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors - stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, retry.WithOnRetryCallback(func(attempt uint, err error) { - retryCallbackCount++ - })) + stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList, + retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + retryCallbackCount++ + }), + ) require.NoError(s.T(), err, "establishing the connection must always succeed") s.assertPingListWasCorrect(stream) From f4381afd4fe1efbc188f8cd14d18da119688478b Mon Sep 17 00:00:00 2001 From: Alex Simenduev Date: Sat, 3 Apr 2021 15:51:30 +0300 Subject: [PATCH 3/4] move retry attempt trace log to be the default onRetryCallback behaviour --- interceptors/retry/options.go | 4 ++-- interceptors/retry/retry.go | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/interceptors/retry/options.go b/interceptors/retry/options.go index 5d033879b..6f147c673 100644 --- a/interceptors/retry/options.go +++ b/interceptors/retry/options.go @@ -27,7 +27,7 @@ var ( return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt) }), onRetryCallback: OnRetryCallback(func(ctx context.Context, attempt uint, err error) { - // By default we don't have any callback logic to execute + logTrace(ctx, "grpc_retry attempt: %d, backoff for %v", attempt, err) }), } ) @@ -83,7 +83,7 @@ func WithBackoffContext(bf BackoffFuncContext) CallOption { // WithOnRetryCallback sets the callback to use when a retry occurs. // -// By default, no action is performed on retry. +// By default, when no callback function provided, we will just print a log to trace func WithOnRetryCallback(fn OnRetryCallback) CallOption { return CallOption{applyFunc: func(o *options) { o.onRetryCallback = fn diff --git a/interceptors/retry/retry.go b/interceptors/retry/retry.go index e142589b4..3e9504ce4 100644 --- a/interceptors/retry/retry.go +++ b/interceptors/retry/retry.go @@ -49,7 +49,6 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor return nil } callOpts.onRetryCallback(callCtx, attempt, lastErr) - logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) @@ -112,7 +111,6 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto return retryingStreamer, nil } callOpts.onRetryCallback(callCtx, attempt, lastErr) - logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) From 064733b68f1ed3a87092d962c0a6e652b259936c Mon Sep 17 00:00:00 2001 From: Alex Simenduev Date: Sat, 3 Apr 2021 16:46:52 +0300 Subject: [PATCH 4/4] use parentCtx when calling onRetryCallback function --- interceptors/retry/retry.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/interceptors/retry/retry.go b/interceptors/retry/retry.go index 3e9504ce4..2445fc30a 100644 --- a/interceptors/retry/retry.go +++ b/interceptors/retry/retry.go @@ -48,7 +48,7 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor if lastErr == nil { return nil } - callOpts.onRetryCallback(callCtx, attempt, lastErr) + callOpts.onRetryCallback(parentCtx, attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) @@ -110,7 +110,7 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto } return retryingStreamer, nil } - callOpts.onRetryCallback(callCtx, attempt, lastErr) + callOpts.onRetryCallback(parentCtx, attempt, lastErr) if isContextError(lastErr) { if parentCtx.Err() != nil { logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err()) @@ -188,9 +188,9 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { if err := waitRetryBackoff(attempt, s.parentCtx, s.callOpts); err != nil { return err } + s.callOpts.onRetryCallback(s.parentCtx, attempt, lastErr) // TODO(bwplotka): Close cancel as it might leak some resources. callCtx, _ := perCallContext(s.parentCtx, s.callOpts, attempt) //nolint - s.callOpts.onRetryCallback(callCtx, attempt, lastErr) newStream, err := s.reestablishStreamAndResendBuffer(callCtx) if err != nil { // Retry dial and transport errors of establishing stream as grpc doesn't retry.