From abbe5a6517e85343878a3f091ad20a6d2b0b2246 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 26 May 2021 18:56:48 +0800 Subject: [PATCH 01/12] add a new retry pkg to support IsRetryableErr option --- pkg/retry/options.go | 92 ++++++++++++++++++++++++++++++++++ pkg/retry/retry_test.go | 98 +++++++++++++++++++++++++++++++++++++ pkg/retry/retry_with_opt.go | 94 +++++++++++++++++++++++++++++++++++ 3 files changed, 284 insertions(+) create mode 100644 pkg/retry/options.go create mode 100644 pkg/retry/retry_with_opt.go diff --git a/pkg/retry/options.go b/pkg/retry/options.go new file mode 100644 index 00000000000..f8a662db8d3 --- /dev/null +++ b/pkg/retry/options.go @@ -0,0 +1,92 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" +) + +const ( + // defaultBackoffBaseInMs is the initial duration, in Millisecond + defaultBackoffBaseInMs = 10.0 + // defaultBackoffCapInMs is the max amount of duration, in Millisecond + defaultBackoffCapInMs = 100.0 + defaultMaxTries = 3 +) + +// Option ... +type Option func(*retryOptions) + +// IsRetryableErr checks the error is safe to retry or not, eg. "context.Canceled" better not retry +type IsRetryableErr func(error) bool + +// retryOptions ... +type retryOptions struct { + maxTries float64 + backoffBase float64 + backoffCap float64 + isRetryable IsRetryableErr +} + +func newRetryOptions() *retryOptions { + return &retryOptions{ + maxTries: defaultMaxTries, + backoffBase: defaultBackoffBaseInMs, + backoffCap: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, + } +} + +// WithBackoffBaseDelay configures the initial delay +func WithBackoffBaseDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffBase = float64(delayInMs) + } + } +} + +// WithBackoffMaxDelay configures the maximum delay +func WithBackoffMaxDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffCap = float64(delayInMs) + } + } +} + +// WithMaxTries configures maximum tries +func WithMaxTries(tries int64) Option { + return func(o *retryOptions) { + if tries > 0 { + o.maxTries = float64(tries) + } + } +} + +// WithInfiniteTries configures to retry forever till success +func WithInfiniteTries() Option { + return func(o *retryOptions) { + o.maxTries = math.Inf(1) + } +} + +// WithIsRetryableErr configures the error handler, if not set, retry by default +func WithIsRetryableErr(f func(error) bool) Option { + return func(o *retryOptions) { + if f != nil { + o.isRetryable = f + } + } +} diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index 54b3691be9a..197cf1c5978 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -15,12 +15,15 @@ package retry import ( "context" + "math" "testing" "time" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/util/testleak" + "go.uber.org/zap" ) func Test(t *testing.T) { check.TestingT(t) } @@ -39,6 +42,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) { err := Run(500*time.Millisecond, 3, f) c.Assert(err, check.ErrorMatches, "test") + // 👇 i think tries = first call + maxRetries, so not weird 😎 + // It's weird that backoff may retry one more time than maxTries. // Because the steps in backoff.Retry is: // 1. Call function @@ -121,3 +126,96 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) { c.Assert(reportedElapsed, check.Greater, time.Second) c.Assert(reportedElapsed, check.LessEqual, 3*time.Second) } + +func (s *runSuite) Test_DoShouldRetryAtMostSpecifiedTimes(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(err, check.ErrorMatches, "*test") + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) Test_DoShouldStopOnSuccess(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + if callCount == 2 { + return nil + } + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(err, check.IsNil) + c.Assert(callCount, check.Equals, 2) +} + +func (s *runSuite) Test_IsRetryable(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.Annotate(context.Canceled, "test") + } + + err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 1) + + callCount = 0 + err = Do(context.Background(), f, WithMaxTries(3)) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) Test_DoCancelInfiniteRetry(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + f := func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + log.Info("Test_DoCancelInfiniteRetry run f times", zap.Int("tries", callCount)) + c.Assert(err, check.Equals, context.DeadlineExceeded) + c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries:%d", callCount)) + c.Assert(float64(callCount), check.Less, math.Inf(1)) +} + +func (s *runSuite) Test_DoCancelAtBeginning(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + log.Info("Test_DoCancelAtBeginning run f times", zap.Int("tries", callCount)) + c.Assert(err, check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go new file mode 100644 index 00000000000..a26de7f0e28 --- /dev/null +++ b/pkg/retry/retry_with_opt.go @@ -0,0 +1,94 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "time" + + "github.com/pingcap/errors" +) + +// Operation is the action need to retry +type Operation func() error + +// Do execute the specified function at most maxTries times until it succeeds or got canceled +func Do(ctx context.Context, operation Operation, opts ...Option) error { + retryOption := setOptions(opts...) + return run(ctx, operation, retryOption) +} + +func setOptions(opts ...Option) *retryOptions { + retryOption := newRetryOptions() + for _, opt := range opts { + opt(retryOption) + } + return retryOption +} + +func run(ctx context.Context, op Operation, retryOption *retryOptions) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var t *time.Timer + try := 0.0 + backOff := time.Duration(0) + for { + err := op() + if err == nil { + return nil + } + + if !retryOption.isRetryable(err) { + return err + } + + try++ + if try >= retryOption.maxTries { + return errors.Wrapf(err, "reach maximum try:%f", retryOption.maxTries) + } + + backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, try) + if t == nil { + t = time.NewTimer(backOff) + defer t.Stop() + } else { + t.Reset(backOff) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + } + } +} + +// getBackoffInMs returns the duration to wait before next try +// See https://www.awsarchitectureblog.com/2015/03/backoff.html +func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration { + temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2) + if temp <= 0 { + temp = 1 + } + sleep := temp + rand.Int63n(temp) + + backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs) + return time.Duration(math.Min(backoffCapInMs, backOff)) * time.Millisecond +} From fec55d50980affb28bbaef61941cbcf329e7491f Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 26 May 2021 18:59:29 +0800 Subject: [PATCH 02/12] update comment --- pkg/retry/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index f8a662db8d3..9851f750048 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -82,7 +82,7 @@ func WithInfiniteTries() Option { } } -// WithIsRetryableErr configures the error handler, if not set, retry by default +// WithIsRetryableErr configures the error should retry or not, if not set, retry by default func WithIsRetryableErr(f func(error) bool) Option { return func(o *retryOptions) { if f != nil { From 64599ec1e05cf17cef03577a6466aee846a38f24 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 26 May 2021 19:13:28 +0800 Subject: [PATCH 03/12] update comment --- pkg/retry/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 9851f750048..88033d5acaa 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -75,7 +75,7 @@ func WithMaxTries(tries int64) Option { } } -// WithInfiniteTries configures to retry forever till success +// WithInfiniteTries configures to retry forever till success or got canceled func WithInfiniteTries() Option { return func(o *retryOptions) { o.maxTries = math.Inf(1) From 605e77fa262940424bd3c6f989aa4d6e407c74fa Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 27 May 2021 10:07:10 +0800 Subject: [PATCH 04/12] update --- pkg/retry/options.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 88033d5acaa..d7bebce85b5 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -28,15 +28,13 @@ const ( // Option ... type Option func(*retryOptions) -// IsRetryableErr checks the error is safe to retry or not, eg. "context.Canceled" better not retry -type IsRetryableErr func(error) bool - // retryOptions ... type retryOptions struct { maxTries float64 backoffBase float64 backoffCap float64 - isRetryable IsRetryableErr + // isRetryable checks the error is safe to retry or not, eg. "context.Canceled" better not retry + isRetryable func(error) bool } func newRetryOptions() *retryOptions { From 1e06be74b69cacf0c3bb73136a5dd61265508a44 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 27 May 2021 10:43:41 +0800 Subject: [PATCH 05/12] fix comments --- pkg/retry/options.go | 2 +- pkg/retry/retry.go | 2 +- pkg/retry/retry_test.go | 6 +----- pkg/retry/retry_with_opt.go | 8 ++++---- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index d7bebce85b5..0791b002b3e 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 1783a8ab182..61161cce195 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index 197cf1c5978..7ade382ee32 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,9 +21,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/util/testleak" - "go.uber.org/zap" ) func Test(t *testing.T) { check.TestingT(t) } @@ -198,7 +196,6 @@ func (s *runSuite) Test_DoCancelInfiniteRetry(c *check.C) { } err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) - log.Info("Test_DoCancelInfiniteRetry run f times", zap.Int("tries", callCount)) c.Assert(err, check.Equals, context.DeadlineExceeded) c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries:%d", callCount)) c.Assert(float64(callCount), check.Less, math.Inf(1)) @@ -215,7 +212,6 @@ func (s *runSuite) Test_DoCancelAtBeginning(c *check.C) { } err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) - log.Info("Test_DoCancelAtBeginning run f times", zap.Int("tries", callCount)) c.Assert(err, check.Equals, context.Canceled) c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) } diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index a26de7f0e28..c85dd5c9cad 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } var t *time.Timer - try := 0.0 + try := 0 backOff := time.Duration(0) for { err := op() @@ -60,11 +60,11 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } try++ - if try >= retryOption.maxTries { + if float64(try) >= retryOption.maxTries { return errors.Wrapf(err, "reach maximum try:%f", retryOption.maxTries) } - backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, try) + backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, float64(try)) if t == nil { t = time.NewTimer(backOff) defer t.Stop() From 314503366b0e9a2dc18ffdf32125a25eb4355cf4 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 27 May 2021 11:46:08 +0800 Subject: [PATCH 06/12] change maxTries for float to int --- pkg/retry/options.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 0791b002b3e..d26735c2848 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -30,7 +30,7 @@ type Option func(*retryOptions) // retryOptions ... type retryOptions struct { - maxTries float64 + maxTries int64 backoffBase float64 backoffCap float64 // isRetryable checks the error is safe to retry or not, eg. "context.Canceled" better not retry @@ -68,15 +68,15 @@ func WithBackoffMaxDelay(delayInMs int64) Option { func WithMaxTries(tries int64) Option { return func(o *retryOptions) { if tries > 0 { - o.maxTries = float64(tries) + o.maxTries = tries } } } -// WithInfiniteTries configures to retry forever till success or got canceled +// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled func WithInfiniteTries() Option { return func(o *retryOptions) { - o.maxTries = math.Inf(1) + o.maxTries = math.MaxInt64 } } From af889c1f78d27aa4528b5b698f6d3357dc6839a6 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 27 May 2021 11:49:55 +0800 Subject: [PATCH 07/12] update --- pkg/retry/retry_with_opt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index c85dd5c9cad..4fcb7b02fbb 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -60,8 +60,8 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { } try++ - if float64(try) >= retryOption.maxTries { - return errors.Wrapf(err, "reach maximum try:%f", retryOption.maxTries) + if int64(try) >= retryOption.maxTries { + return errors.Wrapf(err, "reach maximum try:%d", retryOption.maxTries) } backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, float64(try)) From 750c047da21afd2d94f7f97cc253056a25be2402 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 27 May 2021 15:35:16 +0800 Subject: [PATCH 08/12] fix commnets --- pkg/retry/options.go | 1 - pkg/retry/retry_test.go | 52 ++++++++++++++++++++++++++++++++----- pkg/retry/retry_with_opt.go | 2 +- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index d26735c2848..3d221faf7c8 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -28,7 +28,6 @@ const ( // Option ... type Option func(*retryOptions) -// retryOptions ... type retryOptions struct { maxTries int64 backoffBase float64 diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index 7ade382ee32..c3feae46fdb 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -125,7 +125,7 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) { c.Assert(reportedElapsed, check.LessEqual, 3*time.Second) } -func (s *runSuite) Test_DoShouldRetryAtMostSpecifiedTimes(c *check.C) { +func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) { defer testleak.AfterTest(c)() var callCount int f := func() error { @@ -138,7 +138,7 @@ func (s *runSuite) Test_DoShouldRetryAtMostSpecifiedTimes(c *check.C) { c.Assert(callCount, check.Equals, 3) } -func (s *runSuite) Test_DoShouldStopOnSuccess(c *check.C) { +func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) { defer testleak.AfterTest(c)() var callCount int f := func() error { @@ -154,7 +154,7 @@ func (s *runSuite) Test_DoShouldStopOnSuccess(c *check.C) { c.Assert(callCount, check.Equals, 2) } -func (s *runSuite) Test_IsRetryable(c *check.C) { +func (s *runSuite) TestIsRetryable(c *check.C) { defer testleak.AfterTest(c)() var callCount int f := func() error { @@ -180,7 +180,7 @@ func (s *runSuite) Test_IsRetryable(c *check.C) { c.Assert(callCount, check.Equals, 3) } -func (s *runSuite) Test_DoCancelInfiniteRetry(c *check.C) { +func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) { defer testleak.AfterTest(c)() callCount := 0 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) @@ -198,10 +198,10 @@ func (s *runSuite) Test_DoCancelInfiniteRetry(c *check.C) { err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) c.Assert(err, check.Equals, context.DeadlineExceeded) c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries:%d", callCount)) - c.Assert(float64(callCount), check.Less, math.Inf(1)) + c.Assert(callCount, check.Less, math.MaxInt64) } -func (s *runSuite) Test_DoCancelAtBeginning(c *check.C) { +func (s *runSuite) TestDoCancelAtBeginning(c *check.C) { defer testleak.AfterTest(c)() callCount := 0 ctx, cancel := context.WithCancel(context.Background()) @@ -215,3 +215,43 @@ func (s *runSuite) Test_DoCancelAtBeginning(c *check.C) { c.Assert(err, check.Equals, context.Canceled) c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) } + +func (s *runSuite) TestDoCornerCases(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(err, check.ErrorMatches, "*test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(err, check.ErrorMatches, "*test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(err, check.ErrorMatches, "*test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(err, check.ErrorMatches, "*test") + c.Assert(callCount, check.Equals, 2) + + var i int64 + for i = -10; i < 10; i++ { + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) + c.Assert(err, check.ErrorMatches, "*test") + if i > 0 { + c.Assert(int64(callCount), check.Equals, i) + } else { + c.Assert(callCount, check.Equals, defaultMaxTries) + } + } +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index 4fcb7b02fbb..bc31bc2f3a0 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -61,7 +61,7 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { try++ if int64(try) >= retryOption.maxTries { - return errors.Wrapf(err, "reach maximum try:%d", retryOption.maxTries) + return errors.Annotatef(err, "reach maximum try:%d", retryOption.maxTries) } backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, float64(try)) From 92c02e64f8020009a314e6a6f49fbbb80ce5187c Mon Sep 17 00:00:00 2001 From: ben1009 Date: Fri, 28 May 2021 10:06:59 +0800 Subject: [PATCH 09/12] fix commnets --- pkg/retry/options.go | 24 ++++++++++++------------ pkg/retry/retry_with_opt.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index 3d221faf7c8..e033000ae51 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -29,41 +29,41 @@ const ( type Option func(*retryOptions) type retryOptions struct { - maxTries int64 - backoffBase float64 - backoffCap float64 + maxTries int64 + backoffBaseInMs float64 + backoffCapInMs float64 // isRetryable checks the error is safe to retry or not, eg. "context.Canceled" better not retry isRetryable func(error) bool } func newRetryOptions() *retryOptions { return &retryOptions{ - maxTries: defaultMaxTries, - backoffBase: defaultBackoffBaseInMs, - backoffCap: defaultBackoffCapInMs, - isRetryable: func(err error) bool { return true }, + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, } } -// WithBackoffBaseDelay configures the initial delay +// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used func WithBackoffBaseDelay(delayInMs int64) Option { return func(o *retryOptions) { if delayInMs > 0 { - o.backoffBase = float64(delayInMs) + o.backoffBaseInMs = float64(delayInMs) } } } -// WithBackoffMaxDelay configures the maximum delay +// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used func WithBackoffMaxDelay(delayInMs int64) Option { return func(o *retryOptions) { if delayInMs > 0 { - o.backoffCap = float64(delayInMs) + o.backoffCapInMs = float64(delayInMs) } } } -// WithMaxTries configures maximum tries +// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used func WithMaxTries(tries int64) Option { return func(o *retryOptions) { if tries > 0 { diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index bc31bc2f3a0..90374ac1039 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -64,7 +64,7 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { return errors.Annotatef(err, "reach maximum try:%d", retryOption.maxTries) } - backOff = getBackoffInMs(retryOption.backoffBase, retryOption.backoffCap, float64(try)) + backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) if t == nil { t = time.NewTimer(backOff) defer t.Stop() From 6576a0cd06360d0ea1392d73175e1d339e54c006 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 2 Jun 2021 19:37:30 +0800 Subject: [PATCH 10/12] add new error ErrReachMaxTry --- errors.toml | 5 +++++ pkg/errors/errors.go | 3 +++ pkg/retry/retry_test.go | 19 ++++++++++--------- pkg/retry/retry_with_opt.go | 7 ++++--- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/errors.toml b/errors.toml index 222c9f4c2d2..592dc6fd6fb 100755 --- a/errors.toml +++ b/errors.toml @@ -636,6 +636,11 @@ error = ''' pulsar send message failed ''' +["CDC:ErrReachMaxTry"] +error = ''' +reach maximum try: %d +''' + ["CDC:ErrReactorFinished"] error = ''' the reactor has done its job and should no longer be executed diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5a4858eba8d..8d33a056a5a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -230,4 +230,7 @@ var ( // miscellaneous internal errors ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted")) ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota")) + + // retry error + ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry")) ) diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index c3feae46fdb..96a8e72cd3e 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -134,7 +134,7 @@ func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) { } err := Do(context.Background(), f, WithMaxTries(3)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") c.Assert(callCount, check.Equals, 3) } @@ -196,8 +196,8 @@ func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) { } err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) - c.Assert(err, check.Equals, context.DeadlineExceeded) - c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries:%d", callCount)) + c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded) + c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount)) c.Assert(callCount, check.Less, math.MaxInt64) } @@ -212,7 +212,7 @@ func (s *runSuite) TestDoCancelAtBeginning(c *check.C) { } err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) - c.Assert(err, check.Equals, context.Canceled) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) } @@ -225,29 +225,30 @@ func (s *runSuite) TestDoCornerCases(c *check.C) { } err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") c.Assert(callCount, check.Equals, 2) callCount = 0 err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") c.Assert(callCount, check.Equals, 2) callCount = 0 err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") c.Assert(callCount, check.Equals, 2) callCount = 0 err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") c.Assert(callCount, check.Equals, 2) var i int64 for i = -10; i < 10; i++ { callCount = 0 err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) - c.Assert(err, check.ErrorMatches, "*test") + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*") if i > 0 { c.Assert(int64(callCount), check.Equals, i) } else { diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index 90374ac1039..34741e9f817 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" ) // Operation is the action need to retry @@ -42,7 +43,7 @@ func setOptions(opts ...Option) *retryOptions { func run(ctx context.Context, op Operation, retryOption *retryOptions) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.Trace(ctx.Err()) default: } @@ -61,7 +62,7 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { try++ if int64(try) >= retryOption.maxTries { - return errors.Annotatef(err, "reach maximum try:%d", retryOption.maxTries) + return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries) } backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) @@ -74,7 +75,7 @@ func run(ctx context.Context, op Operation, retryOption *retryOptions) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.Trace(ctx.Err()) case <-t.C: } } From a5354c89573adaa68ff9aaa13c3875059d464b5a Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 3 Jun 2021 14:24:45 +0800 Subject: [PATCH 11/12] update --- pkg/retry/retry_with_opt.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index 34741e9f817..b4af380b582 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -89,7 +89,6 @@ func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration temp = 1 } sleep := temp + rand.Int63n(temp) - backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs) - return time.Duration(math.Min(backoffCapInMs, backOff)) * time.Millisecond + return time.Duration(backOff) * time.Millisecond } From eafb5c2989119d1f428d97c394f2d3ee8118ff34 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Thu, 3 Jun 2021 16:00:15 +0800 Subject: [PATCH 12/12] expose IsRetryable func as a type --- pkg/retry/options.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/retry/options.go b/pkg/retry/options.go index e033000ae51..724195a3f0c 100644 --- a/pkg/retry/options.go +++ b/pkg/retry/options.go @@ -28,12 +28,14 @@ const ( // Option ... type Option func(*retryOptions) +// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry +type IsRetryable func(error) bool + type retryOptions struct { maxTries int64 backoffBaseInMs float64 backoffCapInMs float64 - // isRetryable checks the error is safe to retry or not, eg. "context.Canceled" better not retry - isRetryable func(error) bool + isRetryable IsRetryable } func newRetryOptions() *retryOptions { @@ -80,7 +82,7 @@ func WithInfiniteTries() Option { } // WithIsRetryableErr configures the error should retry or not, if not set, retry by default -func WithIsRetryableErr(f func(error) bool) Option { +func WithIsRetryableErr(f IsRetryable) Option { return func(o *retryOptions) { if f != nil { o.isRetryable = f