Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
[3.1] restore: Make all download error as retryable (#298) (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm authored May 29, 2020
1 parent d690a8b commit 7d539bb
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newDownloadSSTBackoffer() utils.Backoffer {

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
switch errors.Cause(err) {
case errGrpc, errEpochNotMatch, errIngestFailed:
case errGrpc, errEpochNotMatch, errDownloadFailed, errIngestFailed:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case errRangeIsEmpty, errRewriteRuleNotFound:
Expand Down
65 changes: 51 additions & 14 deletions pkg/restore/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testleak"
"go.uber.org/multierr"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/utils"
Expand All @@ -29,8 +30,9 @@ func (s *testBackofferSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func (s *testBackofferSuite) TestImporterBackoffer(c *C) {
func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
var counter int
backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond}
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
Expand All @@ -39,23 +41,58 @@ func (s *testBackofferSuite) TestImporterBackoffer(c *C) {
case 1:
return errEpochNotMatch
case 2:
return errRangeIsEmpty
return nil
}
return nil
}, newImportSSTBackoffer())
}, backoffer)
c.Assert(counter, Equals, 3)
c.Assert(err, Equals, errRangeIsEmpty)

counter = 0
backoffer := importerBackoffer{
attempt: 10,
delayTime: time.Nanosecond,
maxDelayTime: time.Nanosecond,
}
err = utils.WithRetry(context.Background(), func() error {
c.Assert(err, IsNil)
}

func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
var counter int
backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond}
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return errGrpc
case 1:
return errEpochNotMatch
case 2:
return errDownloadFailed
case 3:
return errRangeIsEmpty
}
return nil
}, backoffer)
c.Assert(counter, Equals, 4)
c.Assert(multierr.Errors(err), DeepEquals, []error{
errGrpc,
errEpochNotMatch,
errDownloadFailed,
errRangeIsEmpty,
})
}

func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
var counter int
backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond}
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return errEpochNotMatch
}, &backoffer)
}, backoffer)
c.Assert(counter, Equals, 10)
c.Assert(err, Equals, errEpochNotMatch)
c.Assert(multierr.Errors(err), DeepEquals, []error{
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
errEpochNotMatch,
})
}
23 changes: 14 additions & 9 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v3/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (importer *FileImporter) Import(

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
regionLoop:
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
Expand All @@ -246,15 +248,18 @@ func (importer *FileImporter) Import(
return e
}, newDownloadSSTBackoffer())
if errDownload != nil {
if errDownload == errRewriteRuleNotFound || errDownload == errRangeIsEmpty {
// Skip this region
log.Warn("download file skipped",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(errDownload))
continue
for _, e := range multierr.Errors(errDownload) {
switch e {
case errRewriteRuleNotFound, errRangeIsEmpty:
// Skip this region
log.Warn("download file skipped",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(errDownload))
continue regionLoop
}
}
log.Error("download file failed",
zap.Stringer("file", file),
Expand Down
15 changes: 10 additions & 5 deletions pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package utils
import (
"context"
"time"

"go.uber.org/multierr"
)

// RetryableFunc presents a retryable opreation
Expand All @@ -18,25 +20,28 @@ type Backoffer interface {
Attempt() int
}

// WithRetry retrys a given operation with a backoff policy
// WithRetry retries a given operation with a backoff policy.
//
// Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr containing all errors encountered.
func WithRetry(
ctx context.Context,
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
var lastErr error
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
if err != nil {
lastErr = err
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return lastErr
return allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
} else {
return nil
}
}
return lastErr
return allErrors
}

0 comments on commit 7d539bb

Please sign in to comment.