Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: add a new retry to support the error isRetryable or not (#1848) #2158

Merged
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ error = '''
pulsar send message failed
'''

["CDC:ErrReachMaxTry"]
error = '''
reach maximum try: %d
'''

["CDC:ErrReactorFinished"]
error = '''
the reactor has done its job and should no longer be executed
Expand Down
3 changes: 3 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,7 @@ var (
// miscellaneous internal errors
ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted"))
ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota"))

// retry error
ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry"))
)
91 changes: 91 additions & 0 deletions pkg/retry/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"math"
)

const (
// defaultBackoffBaseInMs is the initial duration, in Millisecond
defaultBackoffBaseInMs = 10.0
// defaultBackoffCapInMs is the max amount of duration, in Millisecond
defaultBackoffCapInMs = 100.0
defaultMaxTries = 3
)

// Option ...
type Option func(*retryOptions)

// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry
type IsRetryable func(error) bool

type retryOptions struct {
maxTries int64
backoffBaseInMs float64
backoffCapInMs float64
isRetryable IsRetryable
}

func newRetryOptions() *retryOptions {
return &retryOptions{
maxTries: defaultMaxTries,
backoffBaseInMs: defaultBackoffBaseInMs,
backoffCapInMs: defaultBackoffCapInMs,
isRetryable: func(err error) bool { return true },
}
}

// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used
func WithBackoffBaseDelay(delayInMs int64) Option {
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffBaseInMs = float64(delayInMs)
}
}
}

// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used
func WithBackoffMaxDelay(delayInMs int64) Option {
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffCapInMs = float64(delayInMs)
}
}
}

// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used
func WithMaxTries(tries int64) Option {
return func(o *retryOptions) {
if tries > 0 {
o.maxTries = tries
}
}
}

// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled
func WithInfiniteTries() Option {
return func(o *retryOptions) {
o.maxTries = math.MaxInt64
}
}

// WithIsRetryableErr configures the error should retry or not, if not set, retry by default
func WithIsRetryableErr(f IsRetryable) Option {
return func(o *retryOptions) {
if f != nil {
o.isRetryable = f
}
}
}
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
137 changes: 136 additions & 1 deletion pkg/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@ package retry

import (
"context"
"math"
"testing"
"time"

Expand All @@ -39,6 +40,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) {

err := Run(500*time.Millisecond, 3, f)
c.Assert(err, check.ErrorMatches, "test")
// 👇 i think tries = first call + maxRetries, so not weird 😎

// It's weird that backoff may retry one more time than maxTries.
// Because the steps in backoff.Retry is:
// 1. Call function
Expand Down Expand Up @@ -121,3 +124,135 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) {
c.Assert(reportedElapsed, check.Greater, time.Second)
c.Assert(reportedElapsed, check.LessEqual, 3*time.Second)
}

func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
if callCount == 2 {
return nil
}
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(err, check.IsNil)
c.Assert(callCount, check.Equals, 2)
}

func (s *runSuite) TestIsRetryable(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.Annotate(context.Canceled, "test")
}

err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool {
switch errors.Cause(err) {
case context.Canceled:
return false
}
return true
}))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 1)

callCount = 0
err = Do(context.Background(), f, WithMaxTries(3))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
defer cancel()
f := func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded)
c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount))
c.Assert(callCount, check.Less, math.MaxInt64)
}

func (s *runSuite) TestDoCancelAtBeginning(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
f := func() error {
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount))
}

func (s *runSuite) TestDoCornerCases(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

var i int64
for i = -10; i < 10; i++ {
callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*")
if i > 0 {
c.Assert(int64(callCount), check.Equals, i)
} else {
c.Assert(callCount, check.Equals, defaultMaxTries)
}
}
}
94 changes: 94 additions & 0 deletions pkg/retry/retry_with_opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"math"
"math/rand"
"time"

"github.com/pingcap/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// Operation is the action need to retry
type Operation func() error

// Do execute the specified function at most maxTries times until it succeeds or got canceled
func Do(ctx context.Context, operation Operation, opts ...Option) error {
retryOption := setOptions(opts...)
return run(ctx, operation, retryOption)
}

func setOptions(opts ...Option) *retryOptions {
retryOption := newRetryOptions()
for _, opt := range opts {
opt(retryOption)
}
return retryOption
}

func run(ctx context.Context, op Operation, retryOption *retryOptions) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}

var t *time.Timer
try := 0
backOff := time.Duration(0)
for {
err := op()
if err == nil {
return nil
}

if !retryOption.isRetryable(err) {
return err
}

try++
if int64(try) >= retryOption.maxTries {
return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries)
}

backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try))
if t == nil {
t = time.NewTimer(backOff)
defer t.Stop()
} else {
t.Reset(backOff)
}

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-t.C:
}
}
}

// getBackoffInMs returns the duration to wait before next try
// See https://www.awsarchitectureblog.com/2015/03/backoff.html
func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration {
temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2)
if temp <= 0 {
temp = 1
}
sleep := temp + rand.Int63n(temp)
backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs)
return time.Duration(backOff) * time.Millisecond
}