diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index a84014c11..aa5010ab4 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -60,7 +60,7 @@ func newDownloadSSTBackoffer() utils.Backoffer { func (bo *importerBackoffer) NextBackoff(err error) time.Duration { switch errors.Cause(err) { - case errGrpc, errEpochNotMatch, errIngestFailed: + case errGrpc, errEpochNotMatch, errDownloadFailed, errIngestFailed: bo.delayTime = 2 * bo.delayTime bo.attempt-- case errRangeIsEmpty, errRewriteRuleNotFound: diff --git a/pkg/restore/backoff_test.go b/pkg/restore/backoff_test.go index a07c0839b..379324028 100644 --- a/pkg/restore/backoff_test.go +++ b/pkg/restore/backoff_test.go @@ -8,6 +8,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/util/testleak" + "go.uber.org/multierr" "github.com/pingcap/br/pkg/mock" "github.com/pingcap/br/pkg/utils" @@ -29,8 +30,9 @@ func (s *testBackofferSuite) TearDownSuite(c *C) { testleak.AfterTest(c)() } -func (s *testBackofferSuite) TestImporterBackoffer(c *C) { +func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) { var counter int + backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond} err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { @@ -39,23 +41,58 @@ func (s *testBackofferSuite) TestImporterBackoffer(c *C) { case 1: return errEpochNotMatch case 2: - return errRangeIsEmpty + return nil } return nil - }, newImportSSTBackoffer()) + }, backoffer) c.Assert(counter, Equals, 3) - c.Assert(err, Equals, errRangeIsEmpty) - - counter = 0 - backoffer := importerBackoffer{ - attempt: 10, - delayTime: time.Nanosecond, - maxDelayTime: time.Nanosecond, - } - err = utils.WithRetry(context.Background(), func() error { + c.Assert(err, IsNil) +} + +func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) { + var counter int + backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond} + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + switch counter { + case 0: + return errGrpc + case 1: + return errEpochNotMatch + case 2: + return errDownloadFailed + case 3: + return errRangeIsEmpty + } + return nil + }, backoffer) + c.Assert(counter, Equals, 4) + c.Assert(multierr.Errors(err), DeepEquals, []error{ + errGrpc, + errEpochNotMatch, + errDownloadFailed, + errRangeIsEmpty, + }) +} + +func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) { + var counter int + backoffer := &importerBackoffer{attempt: 10, delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond} + err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return errEpochNotMatch - }, &backoffer) + }, backoffer) c.Assert(counter, Equals, 10) - c.Assert(err, Equals, errEpochNotMatch) + c.Assert(multierr.Errors(err), DeepEquals, []error{ + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + errEpochNotMatch, + }) } diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 0d441014f..178792d02 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" "github.com/pingcap/pd/v3/pkg/codec" + "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -232,6 +233,7 @@ func (importer *FileImporter) Import( log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region + regionLoop: for _, regionInfo := range regionInfos { info := regionInfo // Try to download file. @@ -246,15 +248,18 @@ func (importer *FileImporter) Import( return e }, newDownloadSSTBackoffer()) if errDownload != nil { - if errDownload == errRewriteRuleNotFound || errDownload == errRangeIsEmpty { - // Skip this region - log.Warn("download file skipped", - zap.Stringer("file", file), - zap.Stringer("region", info.Region), - zap.Binary("startKey", startKey), - zap.Binary("endKey", endKey), - zap.Error(errDownload)) - continue + for _, e := range multierr.Errors(errDownload) { + switch e { + case errRewriteRuleNotFound, errRangeIsEmpty: + // Skip this region + log.Warn("download file skipped", + zap.Stringer("file", file), + zap.Stringer("region", info.Region), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Error(errDownload)) + continue regionLoop + } } log.Error("download file failed", zap.Stringer("file", file), diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go index 1dbbcdad2..8b6461c76 100644 --- a/pkg/utils/retry.go +++ b/pkg/utils/retry.go @@ -5,6 +5,8 @@ package utils import ( "context" "time" + + "go.uber.org/multierr" ) // RetryableFunc presents a retryable opreation @@ -18,25 +20,28 @@ type Backoffer interface { Attempt() int } -// WithRetry retrys a given operation with a backoff policy +// WithRetry retries a given operation with a backoff policy. +// +// Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a +// multierr containing all errors encountered. func WithRetry( ctx context.Context, retryableFunc RetryableFunc, backoffer Backoffer, ) error { - var lastErr error + var allErrors error for backoffer.Attempt() > 0 { err := retryableFunc() if err != nil { - lastErr = err + allErrors = multierr.Append(allErrors, err) select { case <-ctx.Done(): - return lastErr + return allErrors case <-time.After(backoffer.NextBackoff(err)): } } else { return nil } } - return lastErr + return allErrors }