From 138386c098a3b7b4a7070c3fc9ed5defbb612235 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Tue, 26 Nov 2024 12:30:50 -0500 Subject: [PATCH] br: refactor br cli backoff logic (#54644) close pingcap/tidb#54643 --- br/pkg/backup/prepare_snap/env.go | 12 +- br/pkg/backup/prepare_snap/prepare_test.go | 5 +- br/pkg/backup/store.go | 2 +- br/pkg/checkpoint/external_storage.go | 2 +- br/pkg/checksum/executor.go | 4 +- br/pkg/conn/conn.go | 4 +- br/pkg/encryption/master_key/kms_backend.go | 10 +- br/pkg/restore/data/data.go | 47 +- br/pkg/restore/log_client/import.go | 4 +- br/pkg/restore/log_client/import_retry.go | 79 ++-- .../restore/log_client/import_retry_test.go | 56 +-- br/pkg/restore/misc.go | 2 +- br/pkg/restore/snap_client/client.go | 2 +- br/pkg/restore/snap_client/import.go | 6 +- br/pkg/restore/split/client.go | 35 +- br/pkg/restore/split/split.go | 16 +- br/pkg/restore/split/split_test.go | 12 +- br/pkg/task/backup_ebs.go | 2 +- br/pkg/task/operator/prepare_snap.go | 2 +- br/pkg/task/restore.go | 6 +- br/pkg/task/restore_data.go | 31 +- br/pkg/utils/backoff.go | 443 +++++++++++------- br/pkg/utils/backoff_test.go | 46 +- br/pkg/utils/error_handling.go | 8 + br/pkg/utils/retry.go | 76 ++- br/pkg/utils/retry_test.go | 6 +- br/tests/br_file_corruption/run.sh | 6 +- br/tests/br_pitr/run.sh | 4 +- dumpling/export/retry.go | 8 +- 29 files changed, 493 insertions(+), 443 deletions(-) diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go index 672a052ae555a..f0b2301f76477 100644 --- a/br/pkg/backup/prepare_snap/env.go +++ b/br/pkg/backup/prepare_snap/env.go @@ -176,15 +176,17 @@ func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endK type RetryAndSplitRequestEnv struct { Env - GetBackoffer func() utils.Backoffer + GetBackoffStrategy func() utils.BackoffStrategy } func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { - rs := utils.ConstantBackoff(10 * time.Second) - bo := utils.Backoffer(rs) - if r.GetBackoffer != nil { - bo = r.GetBackoffer() + var bo utils.BackoffStrategy + if r.GetBackoffStrategy != nil { + bo = r.GetBackoffStrategy() + } else { + bo = utils.ConstantBackoff(10 * time.Second) } + cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) { cli, err := r.Env.ConnectToStore(ctx, storeID) if err != nil { diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go index d6a5a7c16ae31..69be58e800304 100644 --- a/br/pkg/backup/prepare_snap/prepare_test.go +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -428,9 +428,8 @@ func TestRetryEnv(t *testing.T) { return nil } ms := RetryAndSplitRequestEnv{Env: tms} - ms.GetBackoffer = func() utils.Backoffer { - o := utils.InitialRetryState(2, 0, 0) - return &o + ms.GetBackoffStrategy = func() utils.BackoffStrategy { + return utils.NewBackoffRetryAllErrorStrategy(2, 0, 0) } prep := New(ms) ctx := context.Background() diff --git a/br/pkg/backup/store.go b/br/pkg/backup/store.go index cf2839d7ef1b7..fce5e3f8e46e4 100644 --- a/br/pkg/backup/store.go +++ b/br/pkg/backup/store.go @@ -275,7 +275,7 @@ func startBackup( } return nil }) - }, utils.NewBackupSSTBackoffer()) + }, utils.NewBackupSSTBackoffStrategy()) }) } return eg.Wait() diff --git a/br/pkg/checkpoint/external_storage.go b/br/pkg/checkpoint/external_storage.go index 078f2f1294e91..47d8ed0296624 100644 --- a/br/pkg/checkpoint/external_storage.go +++ b/br/pkg/checkpoint/external_storage.go @@ -85,7 +85,7 @@ func (s *externalCheckpointStorage) getTS(ctx context.Context) (int64, int64, er } return nil - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) return p, l, errors.Trace(errRetry) } diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index c61488aaeeeac..3cd0405470562 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -369,8 +369,6 @@ func (exec *Executor) Execute( updateFn func(), ) (*tipb.ChecksumResponse, error) { checksumResp := &tipb.ChecksumResponse{} - checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime, - utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval) for _, req := range exec.reqs { // Pointer to SessionVars.Killed // Killed is a flag to indicate that this query is killed. @@ -397,7 +395,7 @@ func (exec *Executor) Execute( return errors.Trace(err) } return nil - }, &checksumBackoffer) + }, utils.NewChecksumBackoffStrategy()) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index e8883f1dfc581..7ab362e43ffe1 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -116,7 +116,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, return errors.Trace(err) }, - utils.NewPDReqBackoffer(), + utils.NewAggressivePDBackoffStrategy(), ) return stores, errors.Trace(errRetry) @@ -420,7 +420,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func return err } return nil - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) if err != nil { // if one store failed, break and return error return err diff --git a/br/pkg/encryption/master_key/kms_backend.go b/br/pkg/encryption/master_key/kms_backend.go index 1538a379d93bd..23a7b4e189693 100644 --- a/br/pkg/encryption/master_key/kms_backend.go +++ b/br/pkg/encryption/master_key/kms_backend.go @@ -5,6 +5,7 @@ package encryption import ( "context" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/encryptionpb" @@ -55,12 +56,11 @@ func (k *KmsBackend) Decrypt(ctx context.Context, content *encryptionpb.Encrypte return k.state.cached.encryptionBackend.DecryptContent(ctx, content) } - // piggyback on NewDownloadSSTBackoffer, a refactor is ongoing to remove all the backoffers - // so user don't need to write a backoffer for every type decryptedKey, err := - utils.WithRetryV2(ctx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) ([]byte, error) { - return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey) - }) + utils.WithRetryV2(ctx, utils.NewBackoffRetryAllErrorStrategy(10, 500*time.Millisecond, 5*time.Second), + func(ctx context.Context) ([]byte, error) { + return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey) + }) if err != nil { return nil, errors.Annotate(err, "decrypt encrypted key failed") } diff --git a/br/pkg/restore/data/data.go b/br/pkg/restore/data/data.go index d11115a863d8d..bcab0caa433da 100644 --- a/br/pkg/restore/data/data.go +++ b/br/pkg/restore/data/data.go @@ -3,6 +3,7 @@ package data import ( "context" + stdErr "errors" "io" "time" @@ -62,40 +63,27 @@ type recoveryError struct { atStage RecoveryStage } -func FailedAt(err error) RecoveryStage { - if rerr, ok := err.(recoveryError); ok { - return rerr.atStage +func atStage(err error) RecoveryStage { + var recoveryErr recoveryError + if stdErr.As(err, &recoveryErr) { + return recoveryErr.atStage } return StageUnknown } -type recoveryBackoffer struct { - state utils.RetryState -} - -func newRecoveryBackoffer() *recoveryBackoffer { - return &recoveryBackoffer{ - state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute), - } -} - -func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration { - s := FailedAt(err) - switch s { +func isRetryErr(err error) bool { + stage := atStage(err) + switch stage { case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering: - log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s)) - return bo.state.ExponentialBackoff() + log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", stage)) + return true case StageFlashback: - log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s)) - bo.state.GiveUp() - return 0 + log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", stage)) + return false + default: + log.Warn("unknown stage of recovery for backoff.", zap.Int("val", int(stage))) + return false } - log.Warn("unknown stage of backing off.", zap.Int("val", int(s))) - return bo.state.ExponentialBackoff() -} - -func (bo *recoveryBackoffer) Attempt() int { - return bo.state.Attempt() } // RecoverData recover the tikv cluster @@ -109,7 +97,7 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor // Roughly handle the case that some TiKVs are rebooted during making plan. // Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry, // say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it. - return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) { + return utils.WithRetryV2(ctx, utils.NewRecoveryBackoffStrategy(isRetryErr), func(ctx context.Context) (int, error) { return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency) }) } @@ -395,7 +383,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { - retryState := utils.InitialRetryState(utils.FlashbackRetryTime, utils.FlashbackWaitInterval, utils.FlashbackMaxWaitInterval) retryErr := utils.WithRetry( ctx, func() error { @@ -416,7 +403,7 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve } log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) return nil - }, &retryState) + }, utils.NewFlashBackBackoffStrategy()) recovery.progress.Inc() return retryErr diff --git a/br/pkg/restore/log_client/import.go b/br/pkg/restore/log_client/import.go index 138b89d2430a9..a4dbf4ca73839 100644 --- a/br/pkg/restore/log_client/import.go +++ b/br/pkg/restore/log_client/import.go @@ -137,8 +137,8 @@ func (importer *LogFileImporter) ImportKVFiles( // This RetryState will retry 45 time, about 10 min. rs := utils.InitialRetryState(45, 100*time.Millisecond, 15*time.Second) - ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs) - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { + ctl := CreateRangeController(startKey, endKey, importer.metaClient, &rs) + err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult { subfiles, errFilter := filterFilesByRegion(files, ranges, r) if errFilter != nil { return RPCResultFromError(errFilter) diff --git a/br/pkg/restore/log_client/import_retry.go b/br/pkg/restore/log_client/import_retry.go index 93f454d6252e5..dfb5428be3da0 100644 --- a/br/pkg/restore/log_client/import_retry.go +++ b/br/pkg/restore/log_client/import_retry.go @@ -25,7 +25,10 @@ import ( type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult -type OverRegionsInRangeController struct { +// RangeController manages the execution of operations over a range of regions. +// It provides functionality to scan regions within a specified key range and +// apply a given function to each region, handling errors and retries automatically. +type RangeController struct { start []byte end []byte metaClient split.SplitClient @@ -34,10 +37,10 @@ type OverRegionsInRangeController struct { rs *utils.RetryState } -// OverRegionsInRange creates a controller that cloud be used to scan regions in a range and +// CreateRangeController creates a controller that cloud be used to scan regions in a range and // apply a function over these regions. // You can then call the `Run` method for applying some functions. -func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController { +func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController { // IMPORTANT: we record the start/end key with TimeStamp. // but scanRegion will drop the TimeStamp and the end key is exclusive. // if we do not use PrefixNextKey. we might scan fewer regions than we expected. @@ -45,7 +48,7 @@ func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retrySt end = restoreutils.TruncateTS(end) end = kv.PrefixNextKey(end) - return OverRegionsInRangeController{ + return RangeController{ start: start, end: end, metaClient: metaClient, @@ -53,41 +56,30 @@ func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retrySt } } -func (o *OverRegionsInRangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) { +func (o *RangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) { o.errors = multierr.Append(o.errors, errors.Annotatef(&result, "execute over region %v failed", region.Region)) // TODO: Maybe handle some of region errors like `epoch not match`? } -func (o *OverRegionsInRangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) { - var leader *metapb.Peer - failed := false - leaderRs := utils.InitialRetryState(4, 5*time.Second, 10*time.Second) - err := utils.WithRetry(ctx, func() error { +func (o *RangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) { + backoffStrategy := utils.NewBackoffRetryAllErrorStrategy(4, 2*time.Second, 10*time.Second) + return utils.WithRetryV2(ctx, backoffStrategy, func(ctx context.Context) (*metapb.Peer, error) { r, err := o.metaClient.GetRegionByID(ctx, region.Region.Id) if err != nil { - return err + return nil, err } if !split.CheckRegionEpoch(r, region) { - failed = true - return nil + return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s has changed", region) } if r.Leader != nil { - leader = r.Leader - return nil + return r.Leader, nil } - return errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id) - }, &leaderRs) - if failed { - return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s is changed", region) - } - if err != nil { - return nil, err - } - return leader, nil + return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id) + }) } -// handleInRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff. -func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) { +// handleRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff. +func (o *RangeController) handleRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) { if result.StoreError.GetServerIsBusy() != nil { if strings.Contains(result.StoreError.GetMessage(), "memory is limited") { sleepDuration := 15 * time.Second @@ -126,35 +118,32 @@ func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, return true } -func (o *OverRegionsInRangeController) prepareLogCtx(ctx context.Context) context.Context { - lctx := logutil.ContextWithField( +func (o *RangeController) prepareLogCtx(ctx context.Context) context.Context { + return logutil.ContextWithField( ctx, logutil.Key("startKey", o.start), logutil.Key("endKey", o.end), ) - return lctx } -// Run executes the `regionFunc` over the regions in `o.start` and `o.end`. -// It would retry the errors according to the `rpcResponse`. -func (o *OverRegionsInRangeController) Run(ctx context.Context, f RegionFunc) error { - return o.runOverRegions(o.prepareLogCtx(ctx), f) -} +// ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`. +// It would retry errors according to the `rpcResponse`. +func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error { + adjustedCtx := o.prepareLogCtx(ctx) -func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f RegionFunc) error { if !o.rs.ShouldRetry() { return o.errors } // Scan regions covered by the file range regionInfos, errScanRegion := split.PaginateScanRegion( - ctx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit) + adjustedCtx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit) if errScanRegion != nil { return errors.Trace(errScanRegion) } for _, region := range regionInfos { - cont, err := o.runInRegion(ctx, f, region) + cont, err := o.applyFuncToRegion(adjustedCtx, f, region) if err != nil { return err } @@ -165,8 +154,8 @@ func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f Reg return nil } -// runInRegion executes the function in the region, and returns `cont = false` if no need for trying for next region. -func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) { +// applyFuncToRegion executes the function in the region, and returns `cont = false` if no need for trying for next region. +func (o *RangeController) applyFuncToRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) { if !o.rs.ShouldRetry() { return false, o.errors } @@ -180,16 +169,16 @@ func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f Region return false, o.errors case StrategyFromThisRegion: logutil.CL(ctx).Warn("retry for region", logutil.Region(region.Region), logutil.ShortError(&result)) - if !o.handleInRegionError(ctx, result, region) { - return false, o.runOverRegions(ctx, f) + if !o.handleRegionError(ctx, result, region) { + return false, o.ApplyFuncToRange(ctx, f) } - return o.runInRegion(ctx, f, region) + return o.applyFuncToRegion(ctx, f, region) case StrategyFromStart: logutil.CL(ctx).Warn("retry for execution over regions", logutil.ShortError(&result)) // TODO: make a backoffer considering more about the error info, // instead of ingore the result and retry. time.Sleep(o.rs.ExponentialBackoff()) - return false, o.runOverRegions(ctx, f) + return false, o.ApplyFuncToRange(ctx, f) } } return true, nil @@ -251,10 +240,6 @@ func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy { } func (r *RPCResult) StrategyForRetryGoError() RetryStrategy { - if r.Err == nil { - return StrategyGiveUp - } - // we should unwrap the error or we cannot get the write gRPC status. if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok { switch gRPCErr.Code() { diff --git a/br/pkg/restore/log_client/import_retry_test.go b/br/pkg/restore/log_client/import_retry_test.go index 04f2a56dc342b..a6538831528b5 100644 --- a/br/pkg/restore/log_client/import_retry_test.go +++ b/br/pkg/restore/log_client/import_retry_test.go @@ -93,33 +93,33 @@ func TestScanSuccess(t *testing.T) { ctx := context.Background() // make exclusive to inclusive. - ctl := logclient.OverRegionsInRange([]byte("aa"), []byte("aay"), cli, &rs) + ctl := logclient.CreateRangeController([]byte("aa"), []byte("aay"), cli, &rs) collectedRegions := []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { collectedRegions = append(collectedRegions, r) return logclient.RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba") - ctl = logclient.OverRegionsInRange([]byte("aaz"), []byte("bb"), cli, &rs) + ctl = logclient.CreateRangeController([]byte("aaz"), []byte("bb"), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { collectedRegions = append(collectedRegions, r) return logclient.RPCResultOK() }) assertRegions(t, collectedRegions, "aay", "bba", "bbh", "cca") - ctl = logclient.OverRegionsInRange([]byte("aa"), []byte("cc"), cli, &rs) + ctl = logclient.CreateRangeController([]byte("aa"), []byte("cc"), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { collectedRegions = append(collectedRegions, r) return logclient.RPCResultOK() }) assertRegions(t, collectedRegions, "", "aay", "bba", "bbh", "cca", "") - ctl = logclient.OverRegionsInRange([]byte("aa"), []byte(""), cli, &rs) + ctl = logclient.CreateRangeController([]byte("aa"), []byte(""), cli, &rs) collectedRegions = []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { collectedRegions = append(collectedRegions, r) return logclient.RPCResultOK() }) @@ -130,7 +130,7 @@ func TestNotLeader(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(1, 0, 0) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() notLeader := errorpb.Error{ @@ -144,7 +144,7 @@ func TestNotLeader(t *testing.T) { meetRegions := []*split.RegionInfo{} // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err := ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) } @@ -170,7 +170,7 @@ func TestServerIsBusy(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() serverIsBusy := errorpb.Error{ @@ -184,7 +184,7 @@ func TestServerIsBusy(t *testing.T) { // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} theFirstRun := true - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err := ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if theFirstRun && r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) theFirstRun = false @@ -199,7 +199,7 @@ func TestServerIsBusy(t *testing.T) { require.NoError(t, err) assertRegions(t, idEqualsTo2Regions, "aay", "bba") assertRegions(t, meetRegions, "", "aay", "bba", "bbh", "cca", "") - require.Equal(t, rs.Attempt(), 1) + require.Equal(t, rs.RemainingAttempts(), 1) } func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { @@ -211,7 +211,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() serverIsBusy := errorpb.Error{ @@ -225,7 +225,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { // record all regions we meet with id == 2. idEqualsTo2Regions := []*split.RegionInfo{} theFirstRun := true - err := ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err := ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if theFirstRun && r.Region.Id == 2 { idEqualsTo2Regions = append(idEqualsTo2Regions, r) theFirstRun = false @@ -240,7 +240,7 @@ func TestServerIsBusyWithMemoryIsLimited(t *testing.T) { require.NoError(t, err) assertRegions(t, idEqualsTo2Regions, "aay", "bba") assertRegions(t, meetRegions, "", "aay", "bba", "bbh", "cca", "") - require.Equal(t, rs.Attempt(), 2) + require.Equal(t, rs.RemainingAttempts(), 2) } func printRegion(name string, infos []*split.RegionInfo) { @@ -263,7 +263,7 @@ func TestEpochNotMatch(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.RegionsInfo.Regions) @@ -297,7 +297,7 @@ func TestEpochNotMatch(t *testing.T) { firstRunRegions := []*split.RegionInfo{} secondRunRegions := []*split.RegionInfo{} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if !isSecondRun && r.Region.Id == left.Region.Id { mergeRegion() isSecondRun = true @@ -322,7 +322,7 @@ func TestRegionSplit(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, 0, 0) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.RegionsInfo.Regions) @@ -375,7 +375,7 @@ func TestRegionSplit(t *testing.T) { firstRunRegions := []*split.RegionInfo{} secondRunRegions := []*split.RegionInfo{} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if !isSecondRun && r.Region.Id == target.Region.Id { splitRegion() isSecondRun = true @@ -400,7 +400,7 @@ func TestRetryBackoff(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond) - ctl := logclient.OverRegionsInRange([]byte(""), []byte(""), cli, &rs) + ctl := logclient.CreateRangeController([]byte(""), []byte(""), cli, &rs) ctx := context.Background() printPDRegion("cli", cli.RegionsInfo.Regions) @@ -417,7 +417,7 @@ func TestRetryBackoff(t *testing.T) { }, }} isSecondRun := false - err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { if !isSecondRun && r.Region.Id == left.Region.Id { isSecondRun = true return logclient.RPCResultFromPBError(epochNotLeader) @@ -425,7 +425,7 @@ func TestRetryBackoff(t *testing.T) { return logclient.RPCResultOK() }) printPDRegion("cli", cli.RegionsInfo.Regions) - require.Equal(t, 1, rs.Attempt()) + require.Equal(t, 1, rs.RemainingAttempts()) // we retried leader not found error. so the next backoff should be 2 * initical backoff. require.Equal(t, 2*time.Millisecond, rs.ExponentialBackoff()) require.NoError(t, err) @@ -451,13 +451,13 @@ func TestPaginateScanLeader(t *testing.T) { // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) cli := initTestClient(false) rs := utils.InitialRetryState(2, time.Millisecond, 10*time.Millisecond) - ctl := logclient.OverRegionsInRange([]byte("aa"), []byte("aaz"), cli, &rs) + ctl := logclient.CreateRangeController([]byte("aa"), []byte("aaz"), cli, &rs) ctx := context.Background() cli.InjectErr = true cli.InjectTimes = int32(envInt("PAGINATE_SCAN_LEADER_FAILURE_COUNT", 2)) collectedRegions := []*split.RegionInfo{} - ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { + ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) logclient.RPCResult { collectedRegions = append(collectedRegions, r) return logclient.RPCResultOK() }) @@ -470,7 +470,7 @@ func TestRetryRecognizeErrCode(t *testing.T) { ctx := context.Background() inner := 0 outer := 0 - utils.WithRetry(ctx, func() error { + _ = utils.WithRetry(ctx, func() error { e := utils.WithRetry(ctx, func() error { inner++ e := status.Error(codes.Unavailable, "the connection to TiKV has been cut by a neko, meow :3") @@ -478,10 +478,10 @@ func TestRetryRecognizeErrCode(t *testing.T) { return errors.Trace(e) } return nil - }, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("download sst", 3))) + }, utils.NewBackoffRetryAllErrorStrategy(10, waitTime, maxWaitTime)) outer++ return errors.Trace(e) - }, utils.NewBackoffer(10, waitTime, maxWaitTime, utils.NewErrorContext("import sst", 3))) + }, utils.NewBackoffRetryAllErrorStrategy(10, waitTime, maxWaitTime)) require.Equal(t, 10, outer) require.Equal(t, 100, inner) } diff --git a/br/pkg/restore/misc.go b/br/pkg/restore/misc.go index 8280e2171b78c..7bf0564e787eb 100644 --- a/br/pkg/restore/misc.go +++ b/br/pkg/restore/misc.go @@ -149,7 +149,7 @@ func GetTSWithRetry(ctx context.Context, pdClient pd.Client) (uint64, error) { log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr)) } return getTSErr - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) if err != nil { log.Error("failed to get TS", zap.Error(err)) diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index a7e0ecab3d230..b56ab9e882f55 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -660,7 +660,7 @@ func (rc *SnapClient) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) return utils.WithRetry(ctx, func() error { return pdCtrl.ResetTS(ctx, restoreTS) - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) } // GetDatabases returns all databases. diff --git a/br/pkg/restore/snap_client/import.go b/br/pkg/restore/snap_client/import.go index 4e71c6fbe0cc4..d1a81f0836362 100644 --- a/br/pkg/restore/snap_client/import.go +++ b/br/pkg/restore/snap_client/import.go @@ -413,7 +413,7 @@ func (importer *SnapFileImporter) Import( log.Debug("ingest file done", logutil.Key("start", startKey), logutil.Key("end", endKey), zap.Stringer("take", time.Since(start))) } return nil - }, utils.NewImportSSTBackoffer()) + }, utils.NewImportSSTBackoffStrategy()) if err != nil { log.Error("import sst file failed after retry, stop the whole progress", restore.ZapBatchBackupFileSet(backupFileSets), zap.Error(err)) return errors.Trace(err) @@ -548,7 +548,7 @@ func (importer *SnapFileImporter) download( } return nil - }, utils.NewDownloadSSTBackoffer()) + }, utils.NewDownloadSSTBackoffStrategy()) return downloadMetas, errDownload } @@ -658,7 +658,7 @@ func (importer *SnapFileImporter) downloadSST( for fileName, req := range downloadReqsMap { var err error var resp *import_sstpb.DownloadResponse - resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) { + resp, err = utils.WithRetryV2(ectx, utils.NewDownloadSSTBackoffStrategy(), func(ctx context.Context) (*import_sstpb.DownloadResponse, error) { dctx, cancel := context.WithTimeout(ctx, gRPCTimeOut) defer cancel() return importer.importClient.DownloadSST(dctx, peer.GetStoreId(), req) diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index fd3af4ee16fae..117809300dde7 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -195,14 +195,11 @@ func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) c.scatterRegionsSequentially( ctx, newRegions, // backoff about 6s, or we give up scattering this region. - &ExponentialBackoffer{ - Attempts: 7, - BaseBackoff: 100 * time.Millisecond, - }) + utils.NewBackoffRetryAllErrorStrategy(7, 100*time.Millisecond, 2*time.Second)) return nil } return err - }, &ExponentialBackoffer{Attempts: 3, BaseBackoff: 500 * time.Millisecond}) + }, utils.NewBackoffRetryAllErrorStrategy(3, 500*time.Millisecond, 2*time.Second)) } func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { @@ -617,30 +614,12 @@ func (c *pdClient) SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][] } slices.SortFunc(retrySplitKeys, bytes.Compare) return lastSplitErr - }, newSplitBackoffer()) + }, utils.NewBackoffRetryAllExceptStrategy(SplitRetryTimes, SplitRetryInterval, SplitMaxRetryInterval, isNonRetryErrForSplit)) return ret, errors.Trace(err) } -type splitBackoffer struct { - state utils.RetryState -} - -func newSplitBackoffer() *splitBackoffer { - return &splitBackoffer{ - state: utils.InitialRetryState(SplitRetryTimes, SplitRetryInterval, SplitMaxRetryInterval), - } -} - -func (bo *splitBackoffer) NextBackoff(err error) time.Duration { - if berrors.ErrInvalidRange.Equal(err) { - bo.state.GiveUp() - return 0 - } - return bo.state.ExponentialBackoff() -} - -func (bo *splitBackoffer) Attempt() int { - return bo.state.Attempt() +func isNonRetryErrForSplit(err error) bool { + return berrors.ErrInvalidRange.Equal(err) } func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) { @@ -808,7 +787,7 @@ func (c *pdClient) SetStoresLabel( return nil } -func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) { +func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffStrategy utils.BackoffStrategy) { newRegionSet := make(map[uint64]*RegionInfo, len(newRegions)) for _, newRegion := range newRegions { newRegionSet[newRegion.Region.Id] = newRegion @@ -832,7 +811,7 @@ func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions [] errs = multierr.Append(errs, err) } return errs - }, backoffer); err != nil { + }, backoffStrategy); err != nil { log.Warn("Some regions haven't been scattered because errors.", zap.Int("count", len(newRegionSet)), // if all region are failed to scatter, the short error might also be verbose... diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index 726c4b89794fc..7486087402a9f 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -324,6 +324,7 @@ func ScanRegionsWithRetry( return regions, err } +// TODO: merge with backoff.go type WaitRegionOnlineBackoffer struct { Stat utils.RetryState } @@ -358,13 +359,14 @@ func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration { return 0 } -// Attempt returns the remain attempt times -func (b *WaitRegionOnlineBackoffer) Attempt() int { - return b.Stat.Attempt() +// RemainingAttempts returns the remain attempt times +func (b *WaitRegionOnlineBackoffer) RemainingAttempts() int { + return b.Stat.RemainingAttempts() } // BackoffMayNotCountBackoffer is a backoffer but it may not increase the retry // counter. It should be used with ErrBackoff or ErrBackoffAndDontCount. +// TODO: merge with backoff.go type BackoffMayNotCountBackoffer struct { state utils.RetryState } @@ -388,7 +390,7 @@ func NewBackoffMayNotCountBackoffer() *BackoffMayNotCountBackoffer { } } -// NextBackoff implements utils.Backoffer. For BackoffMayNotCountBackoffer, only +// NextBackoff implements utils.BackoffStrategy. For BackoffMayNotCountBackoffer, only // ErrBackoff and ErrBackoffAndDontCount is meaningful. func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration { if errors.ErrorEqual(err, ErrBackoff) { @@ -403,9 +405,9 @@ func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration { return 0 } -// Attempt implements utils.Backoffer. -func (b *BackoffMayNotCountBackoffer) Attempt() int { - return b.state.Attempt() +// RemainingAttempts implements utils.BackoffStrategy. +func (b *BackoffMayNotCountBackoffer) RemainingAttempts() int { + return b.state.RemainingAttempts() } // getSplitKeysOfRegions checks every input key is necessary to split region on diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index ee53cce560187..6c40cb0cc09b5 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -99,7 +99,7 @@ func (b *recordCntBackoffer) NextBackoff(error) time.Duration { return 0 } -func (b *recordCntBackoffer) Attempt() int { +func (b *recordCntBackoffer) RemainingAttempts() int { return 100 } @@ -293,19 +293,19 @@ func TestWaitForScatterRegions(t *testing.T) { func TestBackoffMayNotCountBackoffer(t *testing.T) { b := NewBackoffMayNotCountBackoffer() - initVal := b.Attempt() + initVal := b.RemainingAttempts() b.NextBackoff(ErrBackoffAndDontCount) - require.Equal(t, initVal, b.Attempt()) + require.Equal(t, initVal, b.RemainingAttempts()) // test Annotate, which is the real usage in caller b.NextBackoff(errors.Annotate(ErrBackoffAndDontCount, "caller message")) - require.Equal(t, initVal, b.Attempt()) + require.Equal(t, initVal, b.RemainingAttempts()) b.NextBackoff(ErrBackoff) - require.Equal(t, initVal-1, b.Attempt()) + require.Equal(t, initVal-1, b.RemainingAttempts()) b.NextBackoff(goerrors.New("test")) - require.Equal(t, 0, b.Attempt()) + require.Equal(t, 0, b.RemainingAttempts()) } func TestSplitCtxCancel(t *testing.T) { diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index d81c498c9f333..40a9011f5fef7 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -297,7 +297,7 @@ func waitAllScheduleStoppedAndNoRegionHole(ctx context.Context, cfg Config, mgr } // we wait for nearly 15*40 = 600s = 10m backoffer := utils.InitialRetryState(40, 5*time.Second, waitAllScheduleStoppedInterval) - for backoffer.Attempt() > 0 { + for backoffer.RemainingAttempts() > 0 { if ctx.Err() != nil { return ctx.Err() } diff --git a/br/pkg/task/operator/prepare_snap.go b/br/pkg/task/operator/prepare_snap.go index cbe5c3ac2442b..2f846e2ac9dc2 100644 --- a/br/pkg/task/operator/prepare_snap.go +++ b/br/pkg/task/operator/prepare_snap.go @@ -82,7 +82,7 @@ func (cx *AdaptEnvForSnapshotBackupContext) Close() { cx.kvMgr.Close() } -func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.Backoffer { +func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.BackoffStrategy { state := utils.InitialRetryState(64, 1*time.Second, 10*time.Second) bo := utils.GiveUpRetryOn(&state, berrors.ErrPossibleInconsistency) bo = utils.VerboseRetry(bo, logutil.CL(cx).With(zap.String("operation", operation))) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 90f85cf15a7c7..4250533a248ba 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -1171,7 +1171,7 @@ func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) { err = utils.WithRetry(ctx, func() error { resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx) return err - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) if err != nil { return 0, errors.Trace(err) } @@ -1188,7 +1188,7 @@ func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err err = utils.WithRetry(ctx, func() error { stores, err = mgr.GetPDHTTPClient().GetStores(ctx) return err - }, utils.NewPDReqBackoffer()) + }, utils.NewAggressivePDBackoffStrategy()) if err != nil { return nil, errors.Trace(err) } @@ -1296,7 +1296,7 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, } } return nil - }, utils.NewDiskCheckBackoffer()) + }, utils.NewDiskCheckBackoffStrategy()) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index 3adff48e06359..92c5b8dcaecca 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -127,7 +127,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto } return nil }, - utils.NewPDReqBackofferExt(), + utils.NewConservativePDBackoffStrategy(), ) restoreNumStores := len(allStores) if restoreNumStores != numStores { @@ -214,10 +214,7 @@ func resetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage, return errors.New("tiflash store count is less than expected") } return nil - }, &waitTiFlashBackoffer{ - Attempts: 30, - BaseBackoff: 4 * time.Second, - }) + }, utils.NewBackoffRetryAllErrorStrategy(30, 4*time.Second, 32*time.Second)) if err != nil { return err } @@ -238,27 +235,3 @@ func resetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage, return nil }) } - -type waitTiFlashBackoffer struct { - Attempts int - BaseBackoff time.Duration -} - -// NextBackoff returns a duration to wait before retrying again -func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { - bo := b.BaseBackoff - b.Attempts-- - if b.Attempts == 0 { - return 0 - } - b.BaseBackoff *= 2 - if b.BaseBackoff > 32*time.Second { - b.BaseBackoff = 32 * time.Second - } - return bo -} - -// Attempt returns the remain attempt times -func (b *waitTiFlashBackoffer) Attempt() int { - return b.Attempts -} diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index ee97292ef7fc7..7de5e999654e4 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -7,7 +7,6 @@ import ( "database/sql" "io" "math" - "strings" "time" "github.com/pingcap/errors" @@ -51,21 +50,20 @@ const ( ChecksumWaitInterval = 1 * time.Second ChecksumMaxWaitInterval = 30 * time.Second - gRPC_Cancel = "the client connection is closing" + recoveryMaxAttempts = 16 + recoveryDelayTime = 30 * time.Second + recoveryMaxDelayTime = 4 * time.Minute ) -// At least, there are two possible cancel() call, -// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing -func isGRPCCancel(err error) bool { - if s, ok := status.FromError(err); ok { - if strings.Contains(s.Message(), gRPC_Cancel) { - return true - } - } - return false +// BackoffStrategy implements a backoff strategy for retry operations. +type BackoffStrategy interface { + // NextBackoff returns a duration to wait before retrying again + NextBackoff(err error) time.Duration + // RemainingAttempts returns the remaining number of attempts + RemainingAttempts() int } -// ConstantBackoff is a backoffer that retry forever until success. +// ConstantBackoff is a backoff strategy that retry forever until success. type ConstantBackoff time.Duration // NextBackoff returns a duration to wait before retrying again @@ -73,16 +71,17 @@ func (c ConstantBackoff) NextBackoff(err error) time.Duration { return time.Duration(c) } -// Attempt returns the remain attempt times -func (c ConstantBackoff) Attempt() int { +// RemainingAttempts returns the remain attempt times +func (c ConstantBackoff) RemainingAttempts() int { // A large enough value. Also still safe for arithmetic operations (won't easily overflow). return math.MaxInt16 } // RetryState is the mutable state needed for retrying. -// It likes the `utils.Backoffer`, but more fundamental: +// It likes the `utils.BackoffStrategy`, but more fundamental: // this only control the backoff time and knows nothing about what error happens. // NOTE: Maybe also implement the backoffer via this. +// TODO: merge with BackoffStrategy type RetryState struct { maxRetry int retryTimes int @@ -108,7 +107,7 @@ func (rs *RetryState) ShouldRetry() bool { // Get the exponential backoff durion and transform the state. func (rs *RetryState) ExponentialBackoff() time.Duration { rs.retryTimes++ - failpoint.Inject("set-import-attempt-to-one", func(_ failpoint.Value) { + failpoint.Inject("set-remaining-attempts-to-one", func(_ failpoint.Value) { rs.retryTimes = rs.maxRetry }) backoff := rs.nextBackoff @@ -128,204 +127,330 @@ func (rs *RetryState) ReduceRetry() { rs.retryTimes-- } -// Attempt implements the `Backoffer`. +// Attempt implements the `BackoffStrategy`. // TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)? -func (rs *RetryState) Attempt() int { +func (rs *RetryState) RemainingAttempts() int { return rs.maxRetry - rs.retryTimes } -// NextBackoff implements the `Backoffer`. +// NextBackoff implements the `BackoffStrategy`. func (rs *RetryState) NextBackoff(error) time.Duration { return rs.ExponentialBackoff() } -type importerBackoffer struct { - attempt int - delayTime time.Duration - maxDelayTime time.Duration - errContext *ErrorContext +type backoffStrategyImpl struct { + remainingAttempts int + delayTime time.Duration + maxDelayTime time.Duration + errContext *ErrorContext + isRetryErr func(error) bool + isNonRetryErr func(error) bool +} + +// BackoffOption defines a function type for configuring backoffStrategyImpl +type BackoffOption func(*backoffStrategyImpl) + +// WithRemainingAttempts sets the remaining attempts +func WithRemainingAttempts(attempts int) BackoffOption { + return func(b *backoffStrategyImpl) { + b.remainingAttempts = attempts + } +} + +// WithDelayTime sets the initial delay time +func WithDelayTime(delay time.Duration) BackoffOption { + return func(b *backoffStrategyImpl) { + b.delayTime = delay + } } -// NewBackoffer creates a new controller regulating a truncated exponential backoff. -func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer { - return &importerBackoffer{ - attempt: attempt, - delayTime: delayTime, - maxDelayTime: maxDelayTime, - errContext: errContext, +// WithMaxDelayTime sets the maximum delay time +func WithMaxDelayTime(maxDelay time.Duration) BackoffOption { + return func(b *backoffStrategyImpl) { + b.maxDelayTime = maxDelay } } -func NewImportSSTBackoffer() Backoffer { +// WithErrorContext sets the error context +func WithErrorContext(errContext *ErrorContext) BackoffOption { + return func(b *backoffStrategyImpl) { + b.errContext = errContext + } +} + +// WithRetryErrorFunc sets the retry error checking function +func WithRetryErrorFunc(isRetryErr func(error) bool) BackoffOption { + return func(b *backoffStrategyImpl) { + b.isRetryErr = isRetryErr + } +} + +// WithNonRetryErrorFunc sets the non-retry error checking function +func WithNonRetryErrorFunc(isNonRetryErr func(error) bool) BackoffOption { + return func(b *backoffStrategyImpl) { + b.isNonRetryErr = isNonRetryErr + } +} + +// NewBackoffStrategy creates a new backoff strategy with custom retry logic +func NewBackoffStrategy(opts ...BackoffOption) BackoffStrategy { + // Default values + bs := &backoffStrategyImpl{ + remainingAttempts: 1, + delayTime: time.Second, + maxDelayTime: 10 * time.Second, + errContext: NewZeroRetryContext("default"), + isRetryErr: alwaysTrueFunc(), + isNonRetryErr: alwaysFalseFunc(), + } + + for _, opt := range opts { + opt(bs) + } + + return bs +} + +func NewBackoffRetryAllErrorStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration) BackoffStrategy { + errContext := NewZeroRetryContext("retry all errors") + return NewBackoffStrategy( + WithRemainingAttempts(remainingAttempts), + WithDelayTime(delayTime), + WithMaxDelayTime(maxDelayTime), + WithErrorContext(errContext), + WithRetryErrorFunc(alwaysTrueFunc()), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) +} + +func NewBackoffRetryAllExceptStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration, isNonRetryFunc func(error) bool) BackoffStrategy { + errContext := NewZeroRetryContext("retry all except") + return NewBackoffStrategy( + WithRemainingAttempts(remainingAttempts), + WithDelayTime(delayTime), + WithMaxDelayTime(maxDelayTime), + WithErrorContext(errContext), + WithRetryErrorFunc(alwaysTrueFunc()), + WithNonRetryErrorFunc(isNonRetryFunc), + ) +} + +func NewTiKVStoreBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration, + errContext *ErrorContext) BackoffStrategy { + retryErrs := map[error]struct{}{ + berrors.ErrKVEpochNotMatch: {}, + berrors.ErrKVDownloadFailed: {}, + berrors.ErrKVIngestFailed: {}, + berrors.ErrPDLeaderNotFound: {}, + } + grpcRetryCodes := map[codes.Code]struct{}{ + codes.Canceled: {}, + codes.Unavailable: {}, + codes.Aborted: {}, + codes.DeadlineExceeded: {}, + codes.ResourceExhausted: {}, + codes.Internal: {}, + } + nonRetryErrs := map[error]struct{}{ + context.Canceled: {}, + berrors.ErrKVRangeIsEmpty: {}, + berrors.ErrKVRewriteRuleNotFound: {}, + } + + isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes) + isNonRetryErrFunc := buildIsNonRetryErrFunc(nonRetryErrs) + + return NewBackoffStrategy( + WithRemainingAttempts(maxRetry), + WithDelayTime(delayTime), + WithMaxDelayTime(maxDelayTime), + WithErrorContext(errContext), + WithRetryErrorFunc(isRetryErrFunc), + WithNonRetryErrorFunc(isNonRetryErrFunc), + ) +} + +func NewImportSSTBackoffStrategy() BackoffStrategy { errContext := NewErrorContext("import sst", 3) - return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext) + return NewTiKVStoreBackoffStrategy(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext) } -func NewDownloadSSTBackoffer() Backoffer { +func NewDownloadSSTBackoffStrategy() BackoffStrategy { errContext := NewErrorContext("download sst", 3) - return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext) + return NewTiKVStoreBackoffStrategy(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, + errContext) } -func NewBackupSSTBackoffer() Backoffer { +func NewBackupSSTBackoffStrategy() BackoffStrategy { errContext := NewErrorContext("backup sst", 3) - return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext) + return NewTiKVStoreBackoffStrategy(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext) } -func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - // we don't care storeID here. - errs := multierr.Errors(err) - lastErr := errs[len(errs)-1] - res := HandleUnknownBackupError(lastErr.Error(), 0, bo.errContext) - if res.Strategy == StrategyRetry { - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - } else { - e := errors.Cause(lastErr) - switch e { // nolint:errorlint - case berrors.ErrKVEpochNotMatch, berrors.ErrKVDownloadFailed, berrors.ErrKVIngestFailed, berrors.ErrPDLeaderNotFound: - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound: - // Expected error, finish the operation - bo.delayTime = 0 - bo.attempt = 0 - default: - switch status.Code(e) { - case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal: - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - case codes.Canceled: - if isGRPCCancel(lastErr) { - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - } else { - bo.delayTime = 0 - bo.attempt = 0 - } - default: - // Unexpected error - bo.delayTime = 0 - bo.attempt = 0 - log.Warn("unexpected error, stop retrying", zap.Error(err)) - } - } +func NewPDBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration) BackoffStrategy { + retryErrs := map[error]struct{}{ + berrors.ErrRestoreTotalKVMismatch: {}, + io.EOF: {}, } - failpoint.Inject("set-import-attempt-to-one", func(_ failpoint.Value) { - if bo.attempt > 1 { - bo.attempt = 1 - } - }) - if bo.delayTime > bo.maxDelayTime { - return bo.maxDelayTime + grpcRetryCodes := map[codes.Code]struct{}{ + codes.Canceled: {}, + codes.DeadlineExceeded: {}, + codes.NotFound: {}, + codes.AlreadyExists: {}, + codes.PermissionDenied: {}, + codes.ResourceExhausted: {}, + codes.Aborted: {}, + codes.OutOfRange: {}, + codes.Unavailable: {}, + codes.DataLoss: {}, + codes.Unknown: {}, } - return bo.delayTime + nonRetryErrs := map[error]struct{}{ + context.Canceled: {}, + context.DeadlineExceeded: {}, + sql.ErrNoRows: {}, + } + + isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes) + isNonRetryErrFunc := buildIsNonRetryErrFunc(nonRetryErrs) + + return NewBackoffStrategy( + WithRemainingAttempts(maxRetry), + WithDelayTime(delayTime), + WithMaxDelayTime(maxDelayTime), + WithErrorContext(NewZeroRetryContext("connect PD")), + WithRetryErrorFunc(isRetryErrFunc), + WithNonRetryErrorFunc(isNonRetryErrFunc), + ) } -func (bo *importerBackoffer) Attempt() int { - return bo.attempt +func NewAggressivePDBackoffStrategy() BackoffStrategy { + return NewPDBackoffStrategy(resetTSRetryTime, resetTSWaitInterval, resetTSMaxWaitInterval) } -type pdReqBackoffer struct { - attempt int - delayTime time.Duration - maxDelayTime time.Duration +func NewConservativePDBackoffStrategy() BackoffStrategy { + return NewPDBackoffStrategy(resetTSRetryTimeExt, resetTSWaitIntervalExt, resetTSMaxWaitIntervalExt) } -func NewPDReqBackoffer() Backoffer { - return &pdReqBackoffer{ - attempt: resetTSRetryTime, - delayTime: resetTSWaitInterval, - maxDelayTime: resetTSMaxWaitInterval, +func NewDiskCheckBackoffStrategy() BackoffStrategy { + retryErrs := map[error]struct{}{ + berrors.ErrPDInvalidResponse: {}, + berrors.ErrKVDiskFull: {}, } + grpcRetryCodes := map[codes.Code]struct{}{} + + isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes) + + return NewBackoffStrategy( + WithRemainingAttempts(resetTSRetryTime), + WithDelayTime(resetTSWaitInterval), + WithErrorContext(NewZeroRetryContext("disk check")), + WithRetryErrorFunc(isRetryErrFunc), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) } -func NewPDReqBackofferExt() Backoffer { - return &pdReqBackoffer{ - attempt: resetTSRetryTimeExt, - delayTime: resetTSWaitIntervalExt, - maxDelayTime: resetTSMaxWaitIntervalExt, - } +func NewRecoveryBackoffStrategy(isRetryErrFunc func(error) bool) BackoffStrategy { + return NewBackoffStrategy( + WithRemainingAttempts(recoveryMaxAttempts), + WithDelayTime(recoveryDelayTime), + WithErrorContext(NewZeroRetryContext("recovery")), + WithRetryErrorFunc(isRetryErrFunc), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) +} + +func NewFlashBackBackoffStrategy() BackoffStrategy { + return NewBackoffStrategy( + WithRemainingAttempts(FlashbackRetryTime), + WithDelayTime(FlashbackWaitInterval), + WithErrorContext(NewZeroRetryContext("flashback")), + WithRetryErrorFunc(alwaysTrueFunc()), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) +} + +func NewChecksumBackoffStrategy() BackoffStrategy { + return NewBackoffStrategy( + WithRemainingAttempts(ChecksumRetryTime), + WithDelayTime(ChecksumWaitInterval), + WithErrorContext(NewZeroRetryContext("checksum")), + WithRetryErrorFunc(alwaysTrueFunc()), + WithNonRetryErrorFunc(alwaysFalseFunc()), + ) } -func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { - // bo.delayTime = 2 * bo.delayTime - // bo.attempt-- - e := errors.Cause(err) - switch e { // nolint:errorlint - case nil, context.Canceled, context.DeadlineExceeded, sql.ErrNoRows: - // Excepted error, finish the operation - bo.delayTime = 0 - bo.attempt = 0 - case berrors.ErrRestoreTotalKVMismatch, io.EOF: - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - default: - // If the connection timeout, pd client would cancel the context, and return grpc context cancel error. - // So make the codes.Canceled retryable too. - // It's OK to retry the grpc context cancel error, because the parent context cancel returns context.Canceled. - // For example, cancel the `ectx` and then pdClient.GetTS(ectx) returns context.Canceled instead of grpc context canceled. - switch status.Code(e) { - case codes.DeadlineExceeded, codes.Canceled, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - default: - // Unexcepted error - bo.delayTime = 0 - bo.attempt = 0 - log.Warn("unexcepted error, stop to retry", zap.Error(err)) +func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration { + errs := multierr.Errors(err) + lastErr := errs[len(errs)-1] + // we don't care storeID here. + // TODO: should put all the retry logic in one place, right now errContext has internal retry counter as well + res := HandleUnknownBackupError(lastErr.Error(), 0, bo.errContext) + if res.Strategy == StrategyRetry { + bo.doBackoff() + } else if res.Reason == contextCancelledMsg { + // have to hack here due to complex context.cancel/grpc cancel + bo.stopBackoff() + } else { + e := errors.Cause(lastErr) + if bo.isNonRetryErr(e) { + bo.stopBackoff() + } else if bo.isRetryErr(e) { + bo.doBackoff() + } else { + log.Warn("stop retrying on error", zap.Error(err)) + bo.stopBackoff() } } - failpoint.Inject("set-attempt-to-one", func(_ failpoint.Value) { - bo.attempt = 1 + failpoint.Inject("set-remaining-attempts-to-one", func(_ failpoint.Value) { + if bo.remainingAttempts > 1 { + bo.remainingAttempts = 1 + } }) + if bo.delayTime > bo.maxDelayTime { return bo.maxDelayTime } return bo.delayTime } -func (bo *pdReqBackoffer) Attempt() int { - return bo.attempt +func (bo *backoffStrategyImpl) RemainingAttempts() int { + return bo.remainingAttempts +} + +func (bo *backoffStrategyImpl) doBackoff() { + bo.delayTime = 2 * bo.delayTime + bo.remainingAttempts-- } -type DiskCheckBackoffer struct { - attempt int - delayTime time.Duration - maxDelayTime time.Duration +func (bo *backoffStrategyImpl) stopBackoff() { + bo.delayTime = 0 + bo.remainingAttempts = 0 } -func NewDiskCheckBackoffer() Backoffer { - return &DiskCheckBackoffer{ - attempt: resetTSRetryTime, - delayTime: resetTSWaitInterval, - maxDelayTime: resetTSMaxWaitInterval, +func buildIsRetryErrFunc(retryErrs map[error]struct{}, grpcRetryCodes map[codes.Code]struct{}) func(error) bool { + return func(err error) bool { + _, brRetryOk := retryErrs[err] + _, grpcRetryOk := grpcRetryCodes[status.Code(err)] + return brRetryOk || grpcRetryOk } } -func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration { - e := errors.Cause(err) - switch e { // nolint:errorlint - case nil, context.Canceled, context.DeadlineExceeded, berrors.ErrKVDiskFull: - bo.delayTime = 0 - bo.attempt = 0 - case berrors.ErrPDInvalidResponse: - bo.delayTime = 2 * bo.delayTime - bo.attempt-- - default: - bo.delayTime = 2 * bo.delayTime - if bo.attempt > 5 { - bo.attempt = 5 - } - bo.attempt-- +func buildIsNonRetryErrFunc(nonRetryErrs map[error]struct{}) func(error) bool { + return func(err error) bool { + _, brNonRetryOk := nonRetryErrs[err] + return brNonRetryOk } +} - if bo.delayTime > bo.maxDelayTime { - return bo.maxDelayTime +func alwaysTrueFunc() func(error) bool { + return func(err error) bool { + return true } - return bo.delayTime } -func (bo *DiskCheckBackoffer) Attempt() int { - return bo.attempt +func alwaysFalseFunc() func(error) bool { + return func(err error) bool { + return false + } } diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index e0bc87fabb95e..e63c2776f1472 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -21,7 +21,7 @@ import ( func TestBackoffWithSuccess(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { @@ -33,14 +33,14 @@ func TestBackoffWithSuccess(t *testing.T) { return nil } return nil - }, backoffer) + }, backoffStrategy) require.Equal(t, 3, counter) require.NoError(t, err) } -func TestBackoffWithUnknowneErrorSuccess(t *testing.T) { +func TestBackoffWithUnknownErrorSuccess(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { @@ -50,14 +50,14 @@ func TestBackoffWithUnknowneErrorSuccess(t *testing.T) { return berrors.ErrKVEpochNotMatch } return nil - }, backoffer) + }, backoffStrategy) require.Equal(t, 3, counter) require.NoError(t, err) } func TestBackoffWithFatalError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() @@ -72,7 +72,7 @@ func TestBackoffWithFatalError(t *testing.T) { return berrors.ErrKVRangeIsEmpty } return nil - }, backoffer) + }, backoffStrategy) require.Equal(t, 4, counter) require.Equal(t, []error{ gRPCError, @@ -84,7 +84,7 @@ func TestBackoffWithFatalError(t *testing.T) { func TestWithRetryReturnLastErr(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetryReturnLastErr(context.Background(), func() error { defer func() { counter++ }() @@ -99,7 +99,7 @@ func TestWithRetryReturnLastErr(t *testing.T) { return berrors.ErrKVRangeIsEmpty } return nil - }, backoffer) + }, backoffStrategy) require.Equal(t, 4, counter) require.ErrorIs(t, berrors.ErrKVRangeIsEmpty, err) } @@ -107,22 +107,22 @@ func TestWithRetryReturnLastErr(t *testing.T) { func TestBackoffWithFatalRawGRPCError(t *testing.T) { var counter int canceledError := status.Error(codes.Canceled, "context canceled") - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return canceledError // nolint:wrapcheck - }, backoffer) + }, backoffStrategy) require.Equal(t, 1, counter) require.Equal(t, []error{canceledError}, multierr.Errors(err)) } func TestBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + backoffStrategy := utils.NewTiKVStoreBackoffStrategy(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return berrors.ErrKVEpochNotMatch - }, backoffer) + }, backoffStrategy) require.Equal(t, 10, counter) require.Equal(t, []error{ berrors.ErrKVEpochNotMatch, @@ -140,7 +140,7 @@ func TestBackoffWithRetryableError(t *testing.T) { func TestPdBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewPDReqBackoffer() + backoffStrategy := utils.NewAggressivePDBackoffStrategy() gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() @@ -151,7 +151,7 @@ func TestPdBackoffWithRetryableError(t *testing.T) { return context.Canceled } return gRPCError - }, backoffer) + }, backoffStrategy) require.Equal(t, 7, counter) require.Equal(t, []error{ gRPCError, @@ -166,28 +166,28 @@ func TestPdBackoffWithRetryableError(t *testing.T) { func TestNewImportSSTBackofferWithSucess(t *testing.T) { var counter int - backoffer := utils.NewImportSSTBackoffer() + backoffStrategy := utils.NewImportSSTBackoffStrategy() err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() if counter == 5 { return nil } return berrors.ErrKVDownloadFailed - }, backoffer) + }, backoffStrategy) require.Equal(t, 6, counter) require.NoError(t, err) } func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { var counter int - backoffer := utils.NewDownloadSSTBackoffer() + backoffStrategy := utils.NewDownloadSSTBackoffStrategy() err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() if counter == 3 { return context.Canceled } return berrors.ErrKVIngestFailed - }, backoffer) + }, backoffStrategy) require.Equal(t, 4, counter) require.Equal(t, []error{ berrors.ErrKVIngestFailed, @@ -199,14 +199,14 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { func TestNewBackupSSTBackofferWithCancel(t *testing.T) { var counter int - backoffer := utils.NewBackupSSTBackoffer() + backoffStrategy := utils.NewBackupSSTBackoffStrategy() err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() if counter == 3 { return context.Canceled } return berrors.ErrKVIngestFailed - }, backoffer) + }, backoffStrategy) require.Equal(t, 4, counter) require.Equal(t, []error{ berrors.ErrKVIngestFailed, @@ -218,13 +218,13 @@ func TestNewBackupSSTBackofferWithCancel(t *testing.T) { func TestConstantBackoff(t *testing.T) { backedOff := func(t *testing.T) { - backoffer := utils.ConstantBackoff(10 * time.Millisecond) + backoffStrategy := utils.ConstantBackoff(10 * time.Millisecond) ctx, cancel := context.WithCancel(context.Background()) i := 0 ch := make(chan error) go func() { - _, err := utils.WithRetryV2(ctx, backoffer, func(ctx context.Context) (struct{}, error) { + _, err := utils.WithRetryV2(ctx, backoffStrategy, func(ctx context.Context) (struct{}, error) { i += 1 return struct{}{}, fmt.Errorf("%d times, no meaning", i) }) diff --git a/br/pkg/utils/error_handling.go b/br/pkg/utils/error_handling.go index 6c6c84a2a1884..207363648945f 100644 --- a/br/pkg/utils/error_handling.go +++ b/br/pkg/utils/error_handling.go @@ -111,6 +111,14 @@ func NewDefaultContext() *ErrorContext { } } +func NewZeroRetryContext(scenario string) *ErrorContext { + return &ErrorContext{ + description: scenario, + encounterTimes: make(map[uint64]int), + encounterTimesLimitation: 0, + } +} + func HandleBackupError(err *backuppb.Error, storeId uint64, ec *ErrorContext) ErrorHandlingResult { if err == nil { return ErrorHandlingResult{StrategyRetry, unreachableRetryMsg} diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 6671f16b7842e..e1426628b8af2 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -24,14 +24,6 @@ type RetryableFunc func() error type RetryableFuncV2[T any] func(context.Context) (T, error) -// Backoffer implements a backoff policy for retrying operations. -type Backoffer interface { - // NextBackoff returns a duration to wait before retrying again - NextBackoff(err error) time.Duration - // Attempt returns the remain attempt times - Attempt() int -} - // WithRetry retries a given operation with a backoff policy. // // Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a @@ -39,9 +31,9 @@ type Backoffer interface { func WithRetry( ctx context.Context, retryableFunc RetryableFunc, - backoffer Backoffer, + backoffStrategy BackoffStrategy, ) error { - _, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) { + _, err := WithRetryV2[struct{}](ctx, backoffStrategy, func(ctx context.Context) (struct{}, error) { innerErr := retryableFunc() return struct{}{}, innerErr }) @@ -55,11 +47,11 @@ func WithRetry( // Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value. func WithRetryV2[T any]( ctx context.Context, - backoffer Backoffer, + backoffStrategy BackoffStrategy, fn RetryableFuncV2[T], ) (T, error) { var allErrors error - for backoffer.Attempt() > 0 { + for backoffStrategy.RemainingAttempts() > 0 { res, err := fn(ctx) if err == nil { return res, nil @@ -69,7 +61,7 @@ func WithRetryV2[T any]( case <-ctx.Done(): // allErrors must not be `nil` here, so ignore the context error. return *new(T), allErrors - case <-time.After(backoffer.NextBackoff(err)): + case <-time.After(backoffStrategy.NextBackoff(err)): } } return *new(T), allErrors // nolint:wrapcheck @@ -84,18 +76,18 @@ var sampleLoggerFactory = logutil.SampleLoggerFactory( func WithRetryReturnLastErr( ctx context.Context, retryableFunc RetryableFunc, - backoffer Backoffer, + backoffStrategy BackoffStrategy, ) error { if err := ctx.Err(); err != nil { return err } var lastErr error - for backoffer.Attempt() > 0 { + for backoffStrategy.RemainingAttempts() > 0 { lastErr = retryableFunc() if lastErr == nil { return nil } - backoff := backoffer.NextBackoff(lastErr) + backoff := backoffStrategy.NextBackoff(lastErr) sampleLoggerFactory().Info( "retryable operation failed", zap.Error(lastErr), zap.Duration("backoff", backoff)) @@ -117,12 +109,12 @@ func FallBack2CreateTable(err error) bool { return false } -// RetryWithBackoffer is a simple context for a "mixed" retry. +// RetryWithBackoff is a simple context for a "mixed" retry. // Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument. // But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but // we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade -// back off strategy. That is what the `RetryWithBackoffer` did. -type RetryWithBackoffer struct { +// back off strategy. That is what the `RetryWithBackoff` did. +type RetryWithBackoff struct { bo *tikv.Backoffer totalBackoff int @@ -133,11 +125,11 @@ type RetryWithBackoffer struct { nextBackoff int } -// AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions: +// AdaptTiKVBackoffer creates an "ad-hoc" backoffStrategy, which wraps a backoffer and provides some new functions: // When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config // Which is sealed in the "client-go/internal". -func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer { - return &RetryWithBackoffer{ +func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoff { + return &RetryWithBackoff{ bo: tikv.NewBackoffer(ctx, maxSleepMs), maxBackoff: maxSleepMs, baseErr: baseErr, @@ -145,25 +137,25 @@ func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *Ret } // NextSleepInMS returns the time `BackOff` will sleep in ms of the state. -func (r *RetryWithBackoffer) NextSleepInMS() int { +func (r *RetryWithBackoff) NextSleepInMS() int { r.mu.Lock() defer r.mu.Unlock() return r.nextBackoff } -// TotalSleepInMS returns the total sleeped time in ms. -func (r *RetryWithBackoffer) TotalSleepInMS() int { +// TotalSleepInMS returns the total slept time in ms. +func (r *RetryWithBackoff) TotalSleepInMS() int { return r.totalBackoff + r.bo.GetTotalSleep() } // MaxSleepInMS returns the max sleep time for the retry context in ms. -func (r *RetryWithBackoffer) MaxSleepInMS() int { +func (r *RetryWithBackoff) MaxSleepInMS() int { return r.maxBackoff } // BackOff executes the back off: sleep for a precalculated backoff time. // See `RequestBackOff` for more details. -func (r *RetryWithBackoffer) BackOff() error { +func (r *RetryWithBackoff) BackOff() error { r.mu.Lock() nextBo := r.nextBackoff r.nextBackoff = 0 @@ -179,24 +171,24 @@ func (r *RetryWithBackoffer) BackOff() error { // RequestBackOff register the intent of backing off at least n milliseconds. // That intent will be fulfilled when calling `BackOff`. -func (r *RetryWithBackoffer) RequestBackOff(ms int) { +func (r *RetryWithBackoff) RequestBackOff(ms int) { r.mu.Lock() r.nextBackoff = max(r.nextBackoff, ms) r.mu.Unlock() } // Inner returns the reference to the inner `backoffer`. -func (r *RetryWithBackoffer) Inner() *tikv.Backoffer { +func (r *RetryWithBackoff) Inner() *tikv.Backoffer { return r.bo } -type verboseBackoffer struct { - inner Backoffer +type verboseBackoffStrategy struct { + inner BackoffStrategy logger *zap.Logger groupID uuid.UUID } -func (v *verboseBackoffer) NextBackoff(err error) time.Duration { +func (v *verboseBackoffStrategy) NextBackoff(err error) time.Duration { nextBackoff := v.inner.NextBackoff(err) v.logger.Warn("Encountered err, retrying.", zap.Stringer("nextBackoff", nextBackoff), @@ -205,9 +197,9 @@ func (v *verboseBackoffer) NextBackoff(err error) time.Duration { return nextBackoff } -// Attempt returns the remain attempt times -func (v *verboseBackoffer) Attempt() int { - attempt := v.inner.Attempt() +// RemainingAttempts returns the remain attempt times +func (v *verboseBackoffStrategy) RemainingAttempts() int { + attempt := v.inner.RemainingAttempts() if attempt > 0 { v.logger.Debug("Retry attempt hint.", zap.Int("attempt", attempt), zap.Stringer("gid", v.groupID)) } else { @@ -216,11 +208,11 @@ func (v *verboseBackoffer) Attempt() int { return attempt } -func VerboseRetry(bo Backoffer, logger *zap.Logger) Backoffer { +func VerboseRetry(bo BackoffStrategy, logger *zap.Logger) BackoffStrategy { if logger == nil { logger = log.L() } - vlog := &verboseBackoffer{ + vlog := &verboseBackoffStrategy{ inner: bo, logger: logger, groupID: uuid.New(), @@ -229,7 +221,7 @@ func VerboseRetry(bo Backoffer, logger *zap.Logger) Backoffer { } type failedOnErr struct { - inner Backoffer + inner BackoffStrategy failed bool failedOn []error } @@ -248,15 +240,15 @@ func (f *failedOnErr) NextBackoff(err error) time.Duration { return 0 } -// Attempt returns the remain attempt times -func (f *failedOnErr) Attempt() int { +// RemainingAttempts returns the remain attempt times +func (f *failedOnErr) RemainingAttempts() int { if f.failed { return 0 } - return f.inner.Attempt() + return f.inner.RemainingAttempts() } -func GiveUpRetryOn(bo Backoffer, errs ...error) Backoffer { +func GiveUpRetryOn(bo BackoffStrategy, errs ...error) BackoffStrategy { return &failedOnErr{ inner: bo, failed: false, diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index 0d6bb6e1093ad..0441467638865 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -58,15 +58,15 @@ func TestFailNowIf(t *testing.T) { // Test NextBackoff with an error that is not in failedOn assert.Equal(time.Second, bo.NextBackoff(err2)) - assert.NotEqualValues(0, bo.Attempt()) + assert.NotEqualValues(0, bo.RemainingAttempts()) annotatedErr := errors.Annotate(errors.Annotate(err1, "meow?"), "nya?") assert.Equal(time.Duration(0), bo.NextBackoff(annotatedErr)) - assert.Equal(0, bo.Attempt()) + assert.Equal(0, bo.RemainingAttempts()) mockBO = utils.InitialRetryState(100, time.Second, time.Second) bo = utils.GiveUpRetryOn(&mockBO, berrors.ErrBackupNoLeader) annotatedErr = berrors.ErrBackupNoLeader.FastGen("leader is taking an adventure") assert.Equal(time.Duration(0), bo.NextBackoff(annotatedErr)) - assert.Equal(0, bo.Attempt()) + assert.Equal(0, bo.RemainingAttempts()) } diff --git a/br/tests/br_file_corruption/run.sh b/br/tests/br_file_corruption/run.sh index 60907ac2e7a4c..cd78ac7370a11 100644 --- a/br/tests/br_file_corruption/run.sh +++ b/br/tests/br_file_corruption/run.sh @@ -33,11 +33,11 @@ for filename in $(find $TEST_DIR/$DB -name "*.sst"); do mv "$filename" "$filename_bak" done -# need to drop db otherwise restore will fail because of cluster not fresh but not the expected issue +# need to drop db otherwise restore will fail because of cluster not fresh but not the expected issue run_sql "DROP DATABASE IF EXISTS $DB;" # file lost -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-remaining-attempts-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1 export GO_FAILPOINTS="" @@ -53,7 +53,7 @@ for filename in $(find $TEST_DIR/$DB -name "*.sst_temp"); do truncate -s -11 "${filename%_temp}" done -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-remaining-attempts-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" || restore_fail=1 export GO_FAILPOINTS="" diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 02d85e1170589..0ecc8cfb9d458 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -186,7 +186,7 @@ file_corruption() { # file corruption file_corruption -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-remaining-attempts-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" || restore_fail=1 export GO_FAILPOINTS="" @@ -210,7 +210,7 @@ file_lost() { # file lost file_lost -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-import-attempt-to-one=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/utils/set-remaining-attempts-to-one=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" || restore_fail=1 export GO_FAILPOINTS="" diff --git a/dumpling/export/retry.go b/dumpling/export/retry.go index d76cdf2281614..4220ad590634c 100644 --- a/dumpling/export/retry.go +++ b/dumpling/export/retry.go @@ -24,7 +24,7 @@ const ( ) type backOfferResettable interface { - utils.Backoffer + utils.BackoffStrategy Reset() } @@ -61,7 +61,7 @@ func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration { return b.delayTime } -func (b *dumpChunkBackoffer) Attempt() int { +func (b *dumpChunkBackoffer) RemainingAttempts() int { return b.attempt } @@ -79,7 +79,7 @@ func (b *noopBackoffer) NextBackoff(_ error) time.Duration { return time.Duration(0) } -func (b *noopBackoffer) Attempt() int { +func (b *noopBackoffer) RemainingAttempts() int { return b.attempt } @@ -128,7 +128,7 @@ func (b *lockTablesBackoffer) NextBackoff(err error) time.Duration { return 0 } -func (b *lockTablesBackoffer) Attempt() int { +func (b *lockTablesBackoffer) RemainingAttempts() int { return b.attempt }