Skip to content

Commit

Permalink
br: refactor br cli backoff logic (#54644)
Browse files Browse the repository at this point in the history
close #54643
  • Loading branch information
Tristan1900 authored Nov 26, 2024
1 parent 5ac4f69 commit 138386c
Show file tree
Hide file tree
Showing 29 changed files with 493 additions and 443 deletions.
12 changes: 7 additions & 5 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,17 @@ func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endK

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
GetBackoffStrategy func() utils.BackoffStrategy
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
rs := utils.ConstantBackoff(10 * time.Second)
bo := utils.Backoffer(rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
var bo utils.BackoffStrategy
if r.GetBackoffStrategy != nil {
bo = r.GetBackoffStrategy()
} else {
bo = utils.ConstantBackoff(10 * time.Second)
}

cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) {
cli, err := r.Env.ConnectToStore(ctx, storeID)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,8 @@ func TestRetryEnv(t *testing.T) {
return nil
}
ms := RetryAndSplitRequestEnv{Env: tms}
ms.GetBackoffer = func() utils.Backoffer {
o := utils.InitialRetryState(2, 0, 0)
return &o
ms.GetBackoffStrategy = func() utils.BackoffStrategy {
return utils.NewBackoffRetryAllErrorStrategy(2, 0, 0)
}
prep := New(ms)
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func startBackup(
}
return nil
})
}, utils.NewBackupSSTBackoffer())
}, utils.NewBackupSSTBackoffStrategy())
})
}
return eg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *externalCheckpointStorage) getTS(ctx context.Context) (int64, int64, er
}

return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())

return p, l, errors.Trace(errRetry)
}
Expand Down
4 changes: 1 addition & 3 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,6 @@ func (exec *Executor) Execute(
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime,
utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Expand All @@ -397,7 +395,7 @@ func (exec *Executor) Execute(
return errors.Trace(err)
}
return nil
}, &checksumBackoffer)
}, utils.NewChecksumBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

return errors.Trace(err)
},
utils.NewPDReqBackoffer(),
utils.NewAggressivePDBackoffStrategy(),
)

return stores, errors.Trace(errRetry)
Expand Down Expand Up @@ -420,7 +420,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func
return err
}
return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
// if one store failed, break and return error
return err
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/encryption/master_key/kms_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package encryption
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/encryptionpb"
Expand Down Expand Up @@ -55,12 +56,11 @@ func (k *KmsBackend) Decrypt(ctx context.Context, content *encryptionpb.Encrypte
return k.state.cached.encryptionBackend.DecryptContent(ctx, content)
}

// piggyback on NewDownloadSSTBackoffer, a refactor is ongoing to remove all the backoffers
// so user don't need to write a backoffer for every type
decryptedKey, err :=
utils.WithRetryV2(ctx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) ([]byte, error) {
return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey)
})
utils.WithRetryV2(ctx, utils.NewBackoffRetryAllErrorStrategy(10, 500*time.Millisecond, 5*time.Second),
func(ctx context.Context) ([]byte, error) {
return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey)
})
if err != nil {
return nil, errors.Annotate(err, "decrypt encrypted key failed")
}
Expand Down
47 changes: 17 additions & 30 deletions br/pkg/restore/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package data

import (
"context"
stdErr "errors"
"io"
"time"

Expand Down Expand Up @@ -62,40 +63,27 @@ type recoveryError struct {
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
func atStage(err error) RecoveryStage {
var recoveryErr recoveryError
if stdErr.As(err, &recoveryErr) {
return recoveryErr.atStage
}
return StageUnknown
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute),
}
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
func isRetryErr(err error) bool {
stage := atStage(err)
switch stage {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", stage))
return true
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", stage))
return false
default:
log.Warn("unknown stage of recovery for backoff.", zap.Int("val", int(stage)))
return false
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()
}

// RecoverData recover the tikv cluster
Expand All @@ -109,7 +97,7 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return utils.WithRetryV2(ctx, utils.NewRecoveryBackoffStrategy(isRetryErr), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})
}
Expand Down Expand Up @@ -395,7 +383,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
retryState := utils.InitialRetryState(utils.FlashbackRetryTime, utils.FlashbackWaitInterval, utils.FlashbackMaxWaitInterval)
retryErr := utils.WithRetry(
ctx,
func() error {
Expand All @@ -416,7 +403,7 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve
}
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
return nil
}, &retryState)
}, utils.NewFlashBackBackoffStrategy())

recovery.progress.Inc()
return retryErr
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (importer *LogFileImporter) ImportKVFiles(

// This RetryState will retry 45 time, about 10 min.
rs := utils.InitialRetryState(45, 100*time.Millisecond, 15*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
ctl := CreateRangeController(startKey, endKey, importer.metaClient, &rs)
err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
subfiles, errFilter := filterFilesByRegion(files, ranges, r)
if errFilter != nil {
return RPCResultFromError(errFilter)
Expand Down
79 changes: 32 additions & 47 deletions br/pkg/restore/log_client/import_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type OverRegionsInRangeController struct {
// RangeController manages the execution of operations over a range of regions.
// It provides functionality to scan regions within a specified key range and
// apply a given function to each region, handling errors and retries automatically.
type RangeController struct {
start []byte
end []byte
metaClient split.SplitClient
Expand All @@ -34,60 +37,49 @@ type OverRegionsInRangeController struct {
rs *utils.RetryState
}

// OverRegionsInRange creates a controller that cloud be used to scan regions in a range and
// CreateRangeController creates a controller that cloud be used to scan regions in a range and
// apply a function over these regions.
// You can then call the `Run` method for applying some functions.
func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController {
func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController {
// IMPORTANT: we record the start/end key with TimeStamp.
// but scanRegion will drop the TimeStamp and the end key is exclusive.
// if we do not use PrefixNextKey. we might scan fewer regions than we expected.
// and finally cause the data lost.
end = restoreutils.TruncateTS(end)
end = kv.PrefixNextKey(end)

return OverRegionsInRangeController{
return RangeController{
start: start,
end: end,
metaClient: metaClient,
rs: retryStatus,
}
}

func (o *OverRegionsInRangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) {
func (o *RangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) {
o.errors = multierr.Append(o.errors, errors.Annotatef(&result, "execute over region %v failed", region.Region))
// TODO: Maybe handle some of region errors like `epoch not match`?
}

func (o *OverRegionsInRangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) {
var leader *metapb.Peer
failed := false
leaderRs := utils.InitialRetryState(4, 5*time.Second, 10*time.Second)
err := utils.WithRetry(ctx, func() error {
func (o *RangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) {
backoffStrategy := utils.NewBackoffRetryAllErrorStrategy(4, 2*time.Second, 10*time.Second)
return utils.WithRetryV2(ctx, backoffStrategy, func(ctx context.Context) (*metapb.Peer, error) {
r, err := o.metaClient.GetRegionByID(ctx, region.Region.Id)
if err != nil {
return err
return nil, err
}
if !split.CheckRegionEpoch(r, region) {
failed = true
return nil
return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s has changed", region)
}
if r.Leader != nil {
leader = r.Leader
return nil
return r.Leader, nil
}
return errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id)
}, &leaderRs)
if failed {
return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s is changed", region)
}
if err != nil {
return nil, err
}
return leader, nil
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id)
})
}

// handleInRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) {
// handleRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *RangeController) handleRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) {
if result.StoreError.GetServerIsBusy() != nil {
if strings.Contains(result.StoreError.GetMessage(), "memory is limited") {
sleepDuration := 15 * time.Second
Expand Down Expand Up @@ -126,35 +118,32 @@ func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context,
return true
}

func (o *OverRegionsInRangeController) prepareLogCtx(ctx context.Context) context.Context {
lctx := logutil.ContextWithField(
func (o *RangeController) prepareLogCtx(ctx context.Context) context.Context {
return logutil.ContextWithField(
ctx,
logutil.Key("startKey", o.start),
logutil.Key("endKey", o.end),
)
return lctx
}

// Run executes the `regionFunc` over the regions in `o.start` and `o.end`.
// It would retry the errors according to the `rpcResponse`.
func (o *OverRegionsInRangeController) Run(ctx context.Context, f RegionFunc) error {
return o.runOverRegions(o.prepareLogCtx(ctx), f)
}
// ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`.
// It would retry errors according to the `rpcResponse`.
func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error {
adjustedCtx := o.prepareLogCtx(ctx)

func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f RegionFunc) error {
if !o.rs.ShouldRetry() {
return o.errors
}

// Scan regions covered by the file range
regionInfos, errScanRegion := split.PaginateScanRegion(
ctx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit)
adjustedCtx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit)
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}

for _, region := range regionInfos {
cont, err := o.runInRegion(ctx, f, region)
cont, err := o.applyFuncToRegion(adjustedCtx, f, region)
if err != nil {
return err
}
Expand All @@ -165,8 +154,8 @@ func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f Reg
return nil
}

// runInRegion executes the function in the region, and returns `cont = false` if no need for trying for next region.
func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) {
// applyFuncToRegion executes the function in the region, and returns `cont = false` if no need for trying for next region.
func (o *RangeController) applyFuncToRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) {
if !o.rs.ShouldRetry() {
return false, o.errors
}
Expand All @@ -180,16 +169,16 @@ func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f Region
return false, o.errors
case StrategyFromThisRegion:
logutil.CL(ctx).Warn("retry for region", logutil.Region(region.Region), logutil.ShortError(&result))
if !o.handleInRegionError(ctx, result, region) {
return false, o.runOverRegions(ctx, f)
if !o.handleRegionError(ctx, result, region) {
return false, o.ApplyFuncToRange(ctx, f)
}
return o.runInRegion(ctx, f, region)
return o.applyFuncToRegion(ctx, f, region)
case StrategyFromStart:
logutil.CL(ctx).Warn("retry for execution over regions", logutil.ShortError(&result))
// TODO: make a backoffer considering more about the error info,
// instead of ingore the result and retry.
time.Sleep(o.rs.ExponentialBackoff())
return false, o.runOverRegions(ctx, f)
return false, o.ApplyFuncToRange(ctx, f)
}
}
return true, nil
Expand Down Expand Up @@ -251,10 +240,6 @@ func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy {
}

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
if r.Err == nil {
return StrategyGiveUp
}

// we should unwrap the error or we cannot get the write gRPC status.
if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
switch gRPCErr.Code() {
Expand Down
Loading

0 comments on commit 138386c

Please sign in to comment.