From 10d707ff43a98c806dd16e25eae8bceb1ab493e9 Mon Sep 17 00:00:00 2001 From: shirly Date: Mon, 10 May 2021 20:15:06 +0800 Subject: [PATCH 1/3] store/tikv: move Backoffer into a single package --- store/driver/tikv_driver.go | 4 +- store/gcworker/gc_worker.go | 14 +- store/tikv/2pc.go | 84 ++--- store/tikv/backoff.go | 420 ++--------------------- store/tikv/cleanup.go | 3 +- store/tikv/client_batch.go | 5 +- store/tikv/commit.go | 15 +- store/tikv/delete_range.go | 7 +- store/tikv/kv.go | 17 +- store/tikv/lock_resolver.go | 29 +- store/tikv/pessimistic.go | 9 +- store/tikv/prewrite.go | 23 +- store/tikv/range_task.go | 5 +- store/tikv/rawkv.go | 21 +- store/tikv/region_cache.go | 49 +-- store/tikv/region_cache_test.go | 7 +- store/tikv/region_request.go | 52 +-- store/tikv/region_request_test.go | 5 +- store/tikv/retry/backoff.go | 439 +++++++++++++++++++++++++ store/tikv/{ => retry}/backoff_test.go | 2 +- store/tikv/scan.go | 9 +- store/tikv/snapshot.go | 45 +-- store/tikv/split_region.go | 36 +- store/tikv/test_probe.go | 15 +- store/tikv/tests/lock_test.go | 2 +- store/tikv/txn.go | 19 +- 26 files changed, 725 insertions(+), 611 deletions(-) create mode 100644 store/tikv/retry/backoff.go rename store/tikv/{ => retry}/backoff_test.go (98%) diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 17308d33c6be3..84782e59fe360 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -206,6 +206,8 @@ var ( ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No ) +const getAllMembersBackoff = 5000 + // EtcdAddrs returns etcd server addresses. func (s *tikvStore) EtcdAddrs() ([]string, error) { if s.etcdAddrs == nil { @@ -220,7 +222,7 @@ func (s *tikvStore) EtcdAddrs() ([]string, error) { } ctx := context.Background() - bo := tikv.NewBackoffer(ctx, tikv.GetAllMembersBackoff) + bo := tikv.NewBackoffer(ctx, getAllMembersBackoff) etcdAddrs := make([]string, 0) pdClient := s.GetPDClient() if pdClient == nil { diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 038efa30b92f3..e0aa993558b6c 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -1050,7 +1050,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s var stat tikv.RangeTaskStat key := startKey - bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil) + bo := tikv.NewGcResolveLockMaxBackoffer(ctx) failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) { sleep := v.(int) // cooperate with github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry @@ -1147,7 +1147,7 @@ retryScanAndResolve: if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) { break } - bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil) + bo = tikv.NewGcResolveLockMaxBackoffer(ctx) failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) { sleep := v.(int) bo = tikv.NewBackofferWithVars(ctx, sleep, nil) @@ -1460,7 +1460,7 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv. failpoint.Return(errors.New("injectedError")) }) - bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil) + bo := tikv.NewGcResolveLockMaxBackoffer(ctx) for { if len(locks) == 0 { @@ -1496,18 +1496,20 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv. } // Recreate backoffer for next region - bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil) + bo = tikv.NewGcResolveLockMaxBackoffer(ctx) locks = locks[len(locksInRegion):] } return nil } +const gcOneRegionMaxBackoff = 20000 + func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error { var newSafePoint uint64 var err error - bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil) for { newSafePoint, err = w.pdClient.UpdateGCSafePoint(ctx, safePoint) if err != nil { @@ -1544,7 +1546,7 @@ func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []b }() key := startKey for { - bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil) loc, err := w.tikvStore.GetRegionCache().LocateKey(bo, key) if err != nil { return stat, errors.Trace(err) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8703b1861c65d..7a1e55ae3ea05 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" "github.com/pingcap/tidb/store/tikv/util" @@ -542,7 +543,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh switch act := action.(type) { case actionPrewrite: // Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest. - if len(bo.errors) == 0 { + if !bo.HasErrors() { for _, group := range groups { c.regionTxnSize[group.region.id] = group.mutations.Len() } @@ -572,7 +573,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh valStr, ok := val.(string) if ok && c.sessionID > 0 { if firstIsPrimary && actionIsPessimiticLock { - logutil.Logger(bo.ctx).Warn("pessimisticLock failpoint", zap.String("valStr", valStr)) + logutil.Logger(bo.GetCtx()).Warn("pessimisticLock failpoint", zap.String("valStr", valStr)) switch valStr { case "pessimisticLockSkipPrimary": err = c.doActionOnBatches(bo, action, batchBuilder.allBatches()) @@ -587,7 +588,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh failpoint.Inject("pessimisticRollbackDoNth", func() { _, actionIsPessimisticRollback := action.(actionPessimisticRollback) if actionIsPessimisticRollback && c.sessionID > 0 { - logutil.Logger(bo.ctx).Warn("pessimisticRollbackDoNth failpoint") + logutil.Logger(bo.GetCtx()).Warn("pessimisticRollbackDoNth failpoint") failpoint.Return(nil) } }) @@ -607,16 +608,16 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh } // Already spawned a goroutine for async commit transaction. if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { - secondaryBo := NewBackofferWithVars(context.Background(), int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) + secondaryBo := retry.NewBackofferWithVars(context.Background(), int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) go func() { if c.sessionID > 0 { failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) { if s, ok := v.(string); !ok { - logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys", + logutil.Logger(bo.GetCtx()).Info("[failpoint] sleep 2s before commit secondary keys", zap.Uint64("sessionID", c.sessionID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS)) time.Sleep(2 * time.Second) } else if s == "skip" { - logutil.Logger(bo.ctx).Info("[failpoint] injected skip committing secondaries", + logutil.Logger(bo.GetCtx()).Info("[failpoint] injected skip committing secondaries", zap.Uint64("sessionID", c.sessionID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS)) failpoint.Return() } @@ -722,6 +723,8 @@ func (tm *ttlManager) close() { close(tm.ch) } +const pessimisticLockMaxBackoff = 20000 + func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { // Ticker is set to 1/2 of the ManagedLockTTL. ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2) @@ -735,12 +738,12 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { if tm.lockCtx != nil && tm.lockCtx.Killed != nil && atomic.LoadUint32(tm.lockCtx.Killed) != 0 { return } - bo := NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars) - now, err := c.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars) + now, err := c.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { - err1 := bo.Backoff(BoPDRPC, err) + err1 := bo.Backoff(retry.BoPDRPC, err) if err1 != nil { - logutil.Logger(bo.ctx).Warn("keepAlive get tso fail", + logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", zap.Error(err)) return } @@ -751,7 +754,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { if uptime > config.GetGlobalConfig().MaxTxnTTL { // Checks maximum lifetime for the ttlManager, so when something goes wrong // the key will not be locked forever. - logutil.Logger(bo.ctx).Info("ttlManager live up to its lifetime", + logutil.Logger(bo.GetCtx()).Info("ttlManager live up to its lifetime", zap.Uint64("txnStartTS", c.startTS), zap.Uint64("uptime", uptime), zap.Uint64("maxTxnTTL", config.GetGlobalConfig().MaxTxnTTL)) @@ -765,13 +768,13 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL) - logutil.Logger(bo.ctx).Info("send TxnHeartBeat", + logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL)) startTime := time.Now() _, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL) if err != nil { metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) - logutil.Logger(bo.ctx).Warn("send TxnHeartBeat failed", + logutil.Logger(bo.GetCtx()).Warn("send TxnHeartBeat failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return @@ -801,7 +804,7 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt return 0, errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return 0, errors.Trace(err) } @@ -893,6 +896,11 @@ func (c *twoPhaseCommitter) checkOnePCFallBack(action twoPhaseCommitAction, batc } } +const ( + cleanupMaxBackoff = 20000 + tsoMaxBackoff = 15000 +) + func (c *twoPhaseCommitter) cleanup(ctx context.Context) { c.cleanWg.Add(1) go func() { @@ -903,12 +911,12 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { failpoint.Return() }) - cleanupKeysCtx := context.WithValue(context.Background(), TxnStartKey, ctx.Value(TxnStartKey)) + cleanupKeysCtx := context.WithValue(context.Background(), retry.TxnStartKey, ctx.Value(retry.TxnStartKey)) var err error if !c.isOnePC() { - err = c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) } else if c.isPessimistic { - err = c.pessimisticRollbackMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + err = c.pessimisticRollbackMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) } if err != nil { @@ -1020,7 +1028,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.shouldWriteBinlog() { binlogChan = c.binlog.Prewrite(ctx, c.primary()) } - prewriteBo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) + prewriteBo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) start := time.Now() err = c.prewriteMutations(prewriteBo, c.mutations) @@ -1039,10 +1047,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { commitDetail := c.getDetail() commitDetail.PrewriteTime = time.Since(start) - if prewriteBo.totalSleep > 0 { - atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(prewriteBo.totalSleep)*int64(time.Millisecond)) + if prewriteBo.GetTotalSleep() > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(prewriteBo.GetTotalSleep())*int64(time.Millisecond)) commitDetail.Mu.Lock() - commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, prewriteBo.types...) + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, prewriteBo.GetTypes()...) commitDetail.Mu.Unlock() } if binlogChan != nil { @@ -1096,7 +1104,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } else { start = time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + commitTS, err = c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope()) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), @@ -1179,7 +1187,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { failpoint.Inject("asyncCommitDoNothing", func() { failpoint.Return() }) - commitBo := NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) + commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) err := c.commitMutations(commitBo, c.mutations) if err != nil { logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("sessionID", c.sessionID), @@ -1195,13 +1203,13 @@ func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *util.Co c.txn.GetMemBuffer().DiscardValues() start := time.Now() - commitBo := NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) + commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) err := c.commitMutations(commitBo, c.mutations) commitDetail.CommitTime = time.Since(start) - if commitBo.totalSleep > 0 { - atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond)) + if commitBo.GetTotalSleep() > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.GetTotalSleep())*int64(time.Millisecond)) commitDetail.Mu.Lock() - commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, commitBo.types...) + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, commitBo.GetTypes()...) commitDetail.Mu.Unlock() } if err != nil { @@ -1286,7 +1294,7 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio retryLimit := config.GetGlobalConfig().PessimisticTxn.MaxRetryCount var err error for tryTimes < retryLimit { - pessimisticLockBo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, c.txn.vars) + pessimisticLockBo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, c.txn.vars) err = c.pessimisticLockMutations(pessimisticLockBo, lCtx, &keysNeedToLock) if err != nil { // KeysNeedToLock won't change, so don't async rollback pessimistic locks here for write conflict. @@ -1332,7 +1340,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch return false, err } if c.prewriteStarted { - prewriteBo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) + prewriteBo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) err = c.prewriteMutations(prewriteBo, addMutations) if err != nil { logutil.Logger(ctx).Warn("amend prewrite has failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) @@ -1365,7 +1373,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *util.CommitDetails) (uint64, error) { start := time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + commitTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope()) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), @@ -1579,20 +1587,20 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, singleBatchBackoffer, singleBatchCancel = batchExe.backoffer.Fork() defer singleBatchCancel() } - beforeSleep := singleBatchBackoffer.totalSleep + beforeSleep := singleBatchBackoffer.GetTotalSleep() ch <- batchExe.action.handleSingleBatch(batchExe.committer, singleBatchBackoffer, batch) commitDetail := batchExe.committer.getDetail() if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil - if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { - atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond)) + if delta := singleBatchBackoffer.GetTotalSleep() - beforeSleep; delta > 0 { + atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.GetTotalSleep()-beforeSleep)*int64(time.Millisecond)) commitDetail.Mu.Lock() - commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.types...) + commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.GetTypes()...) commitDetail.Mu.Unlock() } } }() } else { - logutil.Logger(batchExe.backoffer.ctx).Info("break startWorker", + logutil.Logger(batchExe.backoffer.GetCtx()).Info("break startWorker", zap.Stringer("action", batchExe.action), zap.Int("batch size", len(batches)), zap.Int("index", idx)) break @@ -1605,7 +1613,7 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { var err error err = batchExe.initUtils() if err != nil { - logutil.Logger(batchExe.backoffer.ctx).Error("batchExecutor initUtils failed", zap.Error(err)) + logutil.Logger(batchExe.backoffer.GetCtx()).Error("batchExecutor initUtils failed", zap.Error(err)) return err } @@ -1622,14 +1630,14 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { // check results for i := 0; i < len(batches); i++ { if e := <-ch; e != nil { - logutil.Logger(batchExe.backoffer.ctx).Debug("2PC doActionOnBatch failed", + logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch failed", zap.Uint64("session", batchExe.committer.sessionID), zap.Stringer("action type", batchExe.action), zap.Error(e), zap.Uint64("txnStartTS", batchExe.committer.startTS)) // Cancel other requests and return the first error. if cancel != nil { - logutil.Logger(batchExe.backoffer.ctx).Debug("2PC doActionOnBatch to cancel other actions", + logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch to cancel other actions", zap.Uint64("session", batchExe.committer.sessionID), zap.Stringer("action type", batchExe.action), zap.Uint64("txnStartTS", batchExe.committer.startTS)) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index e0115c9e3904a..c471ccc16167a 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -1,4 +1,4 @@ -// Copyright 2016 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,431 +15,55 @@ package tikv import ( "context" - "fmt" - "math" - "math/rand" - "strings" - "sync/atomic" - "time" - "github.com/opentracing/opentracing-go" - "github.com/pingcap/errors" - "github.com/pingcap/log" - tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" - "github.com/pingcap/tidb/store/tikv/logutil" - "github.com/pingcap/tidb/store/tikv/metrics" - "github.com/pingcap/tidb/store/tikv/util" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/pingcap/tidb/store/tikv/retry" ) -const ( - // NoJitter makes the backoff sequence strict exponential. - NoJitter = 1 + iota - // FullJitter applies random factors to strict exponential. - FullJitter - // EqualJitter is also randomized, but prevents very short sleeps. - EqualJitter - // DecorrJitter increases the maximum jitter based on the last random value. - DecorrJitter -) - -func (t BackoffType) metric() prometheus.Observer { - switch t { - // TODO: distinguish tikv and tiflash in metrics - case BoTiKVRPC, BoTiFlashRPC: - return metrics.BackoffHistogramRPC - case BoTxnLock: - return metrics.BackoffHistogramLock - case BoTxnLockFast: - return metrics.BackoffHistogramLockFast - case BoPDRPC: - return metrics.BackoffHistogramPD - case BoRegionMiss: - return metrics.BackoffHistogramRegionMiss - case boTiKVServerBusy, boTiFlashServerBusy: - return metrics.BackoffHistogramServerBusy - case boStaleCmd: - return metrics.BackoffHistogramStaleCmd - } - return metrics.BackoffHistogramEmpty -} - -// NewBackoffFn creates a backoff func which implements exponential backoff with -// optional jitters. -// See http://www.awsarchitectureblog.com/2015/03/backoff.html -func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int { - if base < 2 { - // Top prevent panic in 'rand.Intn'. - base = 2 - } - attempts := 0 - lastSleep := base - return func(ctx context.Context, maxSleepMs int) int { - var sleep int - switch jitter { - case NoJitter: - sleep = expo(base, cap, attempts) - case FullJitter: - v := expo(base, cap, attempts) - sleep = rand.Intn(v) - case EqualJitter: - v := expo(base, cap, attempts) - sleep = v/2 + rand.Intn(v/2) - case DecorrJitter: - sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) - } - logutil.BgLogger().Debug("backoff", - zap.Int("base", base), - zap.Int("sleep", sleep), - zap.Int("attempts", attempts)) - - realSleep := sleep - // when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds. - if maxSleepMs >= 0 && realSleep > maxSleepMs { - realSleep = maxSleepMs - } - select { - case <-time.After(time.Duration(realSleep) * time.Millisecond): - attempts++ - lastSleep = sleep - return realSleep - case <-ctx.Done(): - return 0 - } - } -} - -func expo(base, cap, n int) int { - return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) -} +// Backoffer is a utility for retrying queries. +type Backoffer = retry.Backoffer // BackoffType defines the backoff type. -type BackoffType int +type BackoffType = retry.BackoffType // Back off types. const ( - BoTiKVRPC BackoffType = iota - BoTiFlashRPC - BoTxnLock - BoTxnLockFast - BoPDRPC - BoRegionMiss - boTiKVServerBusy - boTiFlashServerBusy - boTxnNotFound - boStaleCmd - boMaxTsNotSynced + BoRegionMiss = retry.BoRegionMiss + BoTiKVRPC = retry.BoTiKVRPC + BoTiFlashRPC = retry.BoTiFlashRPC + BoTxnLockFast = retry.BoTxnLockFast + BoTxnLock = retry.BoTxnLock + BoPDRPC = retry.BoPDRPC ) -func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int { - if vars.Hook != nil { - vars.Hook(t.String(), vars) - } - switch t { - case BoTiKVRPC, BoTiFlashRPC: - return NewBackoffFn(100, 2000, EqualJitter) - case BoTxnLock: - return NewBackoffFn(200, 3000, EqualJitter) - case BoTxnLockFast: - return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter) - case BoPDRPC: - return NewBackoffFn(500, 3000, EqualJitter) - case BoRegionMiss: - // change base time to 2ms, because it may recover soon. - return NewBackoffFn(2, 500, NoJitter) - case boTxnNotFound: - return NewBackoffFn(2, 500, NoJitter) - case boTiKVServerBusy, boTiFlashServerBusy: - return NewBackoffFn(2000, 10000, EqualJitter) - case boStaleCmd: - return NewBackoffFn(2, 1000, NoJitter) - case boMaxTsNotSynced: - return NewBackoffFn(2, 500, NoJitter) - } - return nil -} - -func (t BackoffType) String() string { - switch t { - case BoTiKVRPC: - return "tikvRPC" - case BoTiFlashRPC: - return "tiflashRPC" - case BoTxnLock: - return "txnLock" - case BoTxnLockFast: - return "txnLockFast" - case BoPDRPC: - return "pdRPC" - case BoRegionMiss: - return "regionMiss" - case boTiKVServerBusy: - return "tikvServerBusy" - case boTiFlashServerBusy: - return "tiflashServerBusy" - case boStaleCmd: - return "staleCommand" - case boTxnNotFound: - return "txnNotFound" - case boMaxTsNotSynced: - return "maxTsNotSynced" - } - return "" -} - -// TError returns pingcap/error of the backoff type. -func (t BackoffType) TError() error { - switch t { - case BoTiKVRPC: - return tikverr.ErrTiKVServerTimeout - case BoTiFlashRPC: - return tikverr.ErrTiFlashServerTimeout - case BoTxnLock, BoTxnLockFast, boTxnNotFound: - return tikverr.ErrResolveLockTimeout - case BoPDRPC: - return tikverr.NewErrPDServerTimeout("") - case BoRegionMiss: - return tikverr.ErrRegionUnavailable - case boTiKVServerBusy: - return tikverr.ErrTiKVServerBusy - case boTiFlashServerBusy: - return tikverr.ErrTiFlashServerBusy - case boStaleCmd: - return tikverr.ErrTiKVStaleCommand - case boMaxTsNotSynced: - return tikverr.ErrTiKVMaxTimestampNotSynced - } - return tikverr.ErrUnknown -} - // Maximum total sleep time(in ms) for kv/cop commands. const ( - GetAllMembersBackoff = 5000 - tsoMaxBackoff = 15000 - scannerNextMaxBackoff = 20000 - batchGetMaxBackoff = 20000 - getMaxBackoff = 20000 - cleanupMaxBackoff = 20000 - GcOneRegionMaxBackoff = 20000 - GcResolveLockMaxBackoff = 100000 - deleteRangeOneRegionMaxBackoff = 100000 - rawkvMaxBackoff = 20000 - splitRegionBackoff = 20000 - maxSplitRegionsBackoff = 120000 - waitScatterRegionFinishBackoff = 120000 - locateRegionMaxBackoff = 20000 - pessimisticLockMaxBackoff = 20000 - pessimisticRollbackMaxBackoff = 20000 + gcResolveLockMaxBackoff = 100000 ) var ( // CommitMaxBackoff is max sleep time of the 'commit' command CommitMaxBackoff = uint64(41000) - // PrewriteMaxBackoff is max sleep time of the `pre-write` command. PrewriteMaxBackoff = 20000 ) -// Backoffer is a utility for retrying queries. -type Backoffer struct { - ctx context.Context - - fn map[BackoffType]func(context.Context, int) int - maxSleep int - totalSleep int - errors []error - types []fmt.Stringer - vars *kv.Variables - noop bool - - backoffSleepMS map[BackoffType]int - backoffTimes map[BackoffType]int -} - -type txnStartCtxKeyType struct{} - -// TxnStartKey is a key for transaction start_ts info in context.Context. -var TxnStartKey interface{} = txnStartCtxKeyType{} - -// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms). -func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { - return &Backoffer{ - ctx: ctx, - maxSleep: maxSleep, - vars: kv.DefaultVars, - } -} - // NewBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables. func NewBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *Backoffer { - return NewBackoffer(ctx, maxSleep).withVars(vars) + return retry.NewBackofferWithVars(ctx, maxSleep, vars) } -// NewNoopBackoff create a Backoffer do nothing just return error directly -func NewNoopBackoff(ctx context.Context) *Backoffer { - return &Backoffer{ctx: ctx, noop: true} -} - -// withVars sets the kv.Variables to the Backoffer and return it. -func (b *Backoffer) withVars(vars *kv.Variables) *Backoffer { - if vars != nil { - b.vars = vars - } - // maxSleep is the max sleep time in millisecond. - // When it is multiplied by BackOffWeight, it should not be greater than MaxInt32. - if math.MaxInt32/b.vars.BackOffWeight >= b.maxSleep { - b.maxSleep *= b.vars.BackOffWeight - } - return b -} - -// Backoff sleeps a while base on the backoffType and records the error message. -// It returns a retryable error if total sleep time exceeds maxSleep. -func (b *Backoffer) Backoff(typ BackoffType, err error) error { - if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", typ), opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(b.ctx, span1) - } - return b.BackoffWithMaxSleep(typ, -1, err) -} - -// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message -// and never sleep more than maxSleepMs for each sleep. -func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error { - if strings.Contains(err.Error(), tikverr.MismatchClusterID) { - logutil.BgLogger().Fatal("critical error", zap.Error(err)) - } - select { - case <-b.ctx.Done(): - return errors.Trace(err) - default: - } - - b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) - b.types = append(b.types, typ) - if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) { - errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", typ.String(), b.maxSleep) - for i, err := range b.errors { - // Print only last 3 errors for non-DEBUG log levels. - if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 { - errMsg += "\n" + err.Error() - } - } - logutil.BgLogger().Warn(errMsg) - // Use the first backoff type to generate a MySQL error. - return b.types[0].(BackoffType).TError() - } - - // Lazy initialize. - if b.fn == nil { - b.fn = make(map[BackoffType]func(context.Context, int) int) - } - f, ok := b.fn[typ] - if !ok { - f = typ.createFn(b.vars) - b.fn[typ] = f - } - - realSleep := f(b.ctx, maxSleepMs) - typ.metric().Observe(float64(realSleep) / 1000) - b.totalSleep += realSleep - if b.backoffSleepMS == nil { - b.backoffSleepMS = make(map[BackoffType]int) - } - b.backoffSleepMS[typ] += realSleep - if b.backoffTimes == nil { - b.backoffTimes = make(map[BackoffType]int) - } - b.backoffTimes[typ]++ - - stmtExec := b.ctx.Value(util.ExecDetailsKey) - if stmtExec != nil { - detail := stmtExec.(*util.ExecDetails) - atomic.AddInt64(&detail.BackoffDuration, int64(realSleep)*int64(time.Millisecond)) - atomic.AddInt64(&detail.BackoffCount, 1) - } - - if b.vars != nil && b.vars.Killed != nil { - if atomic.LoadUint32(b.vars.Killed) == 1 { - return tikverr.ErrQueryInterrupted - } - } - - var startTs interface{} - if ts := b.ctx.Value(TxnStartKey); ts != nil { - startTs = ts - } - logutil.Logger(b.ctx).Debug("retry later", - zap.Error(err), - zap.Int("totalSleep", b.totalSleep), - zap.Int("maxSleep", b.maxSleep), - zap.Stringer("type", typ), - zap.Reflect("txnStartTS", startTs)) - return nil -} - -func (b *Backoffer) String() string { - if b.totalSleep == 0 { - return "" - } - return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.types) -} - -// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares -// current Backoffer's context. -func (b *Backoffer) Clone() *Backoffer { - return &Backoffer{ - ctx: b.ctx, - maxSleep: b.maxSleep, - totalSleep: b.totalSleep, - errors: b.errors, - vars: b.vars, - } -} - -// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds -// a child context of current Backoffer's context. -func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) { - ctx, cancel := context.WithCancel(b.ctx) - return &Backoffer{ - ctx: ctx, - maxSleep: b.maxSleep, - totalSleep: b.totalSleep, - errors: b.errors, - vars: b.vars, - }, cancel -} - -// GetVars returns the binded vars. -func (b *Backoffer) GetVars() *kv.Variables { - return b.vars -} - -// GetTotalSleep returns total sleep time. -func (b *Backoffer) GetTotalSleep() int { - return b.totalSleep -} - -// GetTypes returns type list. -func (b *Backoffer) GetTypes() []fmt.Stringer { - return b.types -} - -// GetCtx returns the binded context. -func (b *Backoffer) GetCtx() context.Context { - return b.ctx +// NewBackoffer creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + return retry.NewBackoffer(ctx, maxSleep) } -// GetBackoffTimes returns a map contains backoff time count by type. -func (b *Backoffer) GetBackoffTimes() map[BackoffType]int { - return b.backoffTimes +// TxnStartKey is a key for transaction start_ts info in context.Context. +func TxnStartKey() interface{} { + return retry.TxnStartKey } -// GetBackoffSleepMS returns a map contains backoff sleep time by type. -func (b *Backoffer) GetBackoffSleepMS() map[BackoffType]int { - return b.backoffSleepMS +// NewGcResolveLockMaxBackoffer creates a Backoffer for Gc to resolve lock. +func NewGcResolveLockMaxBackoffer(ctx context.Context) *Backoffer { + return retry.NewBackofferWithVars(ctx, gcResolveLockMaxBackoff, nil) } diff --git a/store/tikv/cleanup.go b/store/tikv/cleanup.go index dc96ed32ab54c..0260d770cdd44 100644 --- a/store/tikv/cleanup.go +++ b/store/tikv/cleanup.go @@ -18,6 +18,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -49,7 +50,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 886d20abf0e46..70f1cf27ccacc 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -662,7 +663,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b *epoch++ c.failPendingRequests(err) // fail all pending requests. - b := NewBackofferWithVars(context.Background(), math.MaxInt32, nil) + b := retry.NewBackofferWithVars(context.Background(), math.MaxInt32, nil) for { // try to re-create the streaming in the loop. if c.isStopped() { return true @@ -672,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b break } - err2 := b.Backoff(BoTiKVRPC, err1) + err2 := b.Backoff(retry.BoTiKVRPC, err1) // As timeout is set to math.MaxUint32, err2 should always be nil. // This line is added to make the 'make errcheck' pass. terror.Log(err2) diff --git a/store/tikv/commit.go b/store/tikv/commit.go index ce9df6a927355..449081860c029 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -22,6 +22,7 @@ import ( tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -67,7 +68,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -86,7 +87,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch } if keyErr := commitResp.GetError(); keyErr != nil { if rejected := keyErr.GetCommitTsExpired(); rejected != nil { - logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS", + logutil.Logger(bo.GetCtx()).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS", zap.Uint64("txnStartTS", c.startTS), zap.Stringer("info", logutil.Hex(rejected))) @@ -101,7 +102,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch // Update commit ts and retry. commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope()) if err != nil { - logutil.Logger(bo.ctx).Warn("2PC get commitTS failed", + logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return errors.Trace(err) @@ -126,7 +127,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch } return res } - logutil.Logger(bo.ctx).Error("2PC failed commit key after primary key committed", + logutil.Logger(bo.GetCtx()).Error("2PC failed commit key after primary key committed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("commitTS", c.commitTS), @@ -134,7 +135,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch return errors.Trace(err) } // The transaction maybe rolled back by concurrent transactions. - logutil.Logger(bo.ctx).Debug("2PC failed commit primary key", + logutil.Logger(bo.GetCtx()).Debug("2PC failed commit primary key", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) return err @@ -149,10 +150,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch } func (c *twoPhaseCommitter) commitMutations(bo *Backoffer, mutations CommitterMutations) error { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitMutations", opentracing.ChildOf(span.Context())) defer span1.Finish() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } return c.doActionOnMutations(bo, actionCommit{}, mutations) diff --git a/store/tikv/delete_range.go b/store/tikv/delete_range.go index 4cbe9fc039749..6a826d718cba1 100644 --- a/store/tikv/delete_range.go +++ b/store/tikv/delete_range.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -78,6 +79,8 @@ func (t *DeleteRangeTask) Execute(ctx context.Context) error { return err } +const deleteRangeOneRegionMaxBackoff = 100000 + // Execute performs the delete range operation. func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (RangeTaskStat, error) { startKey, rangeEndKey := r.StartKey, r.EndKey @@ -93,7 +96,7 @@ func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (Ra break } - bo := NewBackofferWithVars(ctx, deleteRangeOneRegionMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, deleteRangeOneRegionMaxBackoff, nil) loc, err := t.store.GetRegionCache().LocateKey(bo, startKey) if err != nil { return stat, errors.Trace(err) @@ -121,7 +124,7 @@ func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (Ra return stat, errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return stat, errors.Trace(err) } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 0f4824a785ecb..6749d8c86fe07 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" @@ -244,7 +245,7 @@ func (s *KVStore) BeginWithMinStartTS(txnScope string, minStartTS uint64) (*KVTx // BeginWithMaxPrevSec begins transaction with given max previous seconds for startTS func (s *KVStore) BeginWithMaxPrevSec(txnScope string, maxPrevSec uint64) (*KVTxn, error) { - bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) minStartTS, err := s.getStalenessTimestamp(bo, txnScope, maxPrevSec) if err != nil { return nil, errors.Trace(err) @@ -287,7 +288,7 @@ func (s *KVStore) UUID() string { // CurrentTimestamp returns current timestamp with the given txnScope (local or global). func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { - bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) startTS, err := s.getTimestampWithRetry(bo, txnScope) if err != nil { return 0, errors.Trace(err) @@ -296,14 +297,14 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { } func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("TiKVStore.getTimestampWithRetry", opentracing.ChildOf(span.Context())) defer span1.Finish() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } for { - startTS, err := s.oracle.GetTimestamp(bo.ctx, &oracle.Option{TxnScope: txnScope}) + startTS, err := s.oracle.GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: txnScope}) // mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic. // Then mockGetTSErrorInRetry will return retryable error when first retry. // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout @@ -317,7 +318,7 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, if err == nil { return startTS, nil } - err = bo.Backoff(BoPDRPC, errors.Errorf("get timestamp failed: %v", err)) + err = bo.Backoff(retry.BoPDRPC, errors.Errorf("get timestamp failed: %v", err)) if err != nil { return 0, errors.Trace(err) } @@ -326,11 +327,11 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, func (s *KVStore) getStalenessTimestamp(bo *Backoffer, txnScope string, prevSec uint64) (uint64, error) { for { - startTS, err := s.oracle.GetStaleTimestamp(bo.ctx, txnScope, prevSec) + startTS, err := s.oracle.GetStaleTimestamp(bo.GetCtx(), txnScope, prevSec) if err == nil { return startTS, nil } - err = bo.Backoff(BoPDRPC, errors.Errorf("get staleness timestamp failed: %v", err)) + err = bo.Backoff(retry.BoPDRPC, errors.Errorf("get staleness timestamp failed: %v", err)) if err != nil { return 0, errors.Trace(err) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 05feee6d31adb..fe50910a896e6 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" pd "github.com/tikv/pd/client" @@ -228,7 +229,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // locks have been cleaned before GC. expiredLocks := locks - callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { return false, errors.Trace(err) } @@ -297,7 +298,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return false, errors.Trace(err) } @@ -473,8 +474,8 @@ func (t *txnExpireTime) value() int64 { // seconds before calling it after Prewrite. func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) { var status TxnStatus - bo := NewBackoffer(context.Background(), cleanupMaxBackoff) - currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + bo := retry.NewBackoffer(context.Background(), cleanupMaxBackoff) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { return status, err } @@ -493,7 +494,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64 } else { - currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) if err != nil { return TxnStatus{}, err } @@ -522,12 +523,12 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. // This is likely to happen in the concurrently prewrite when secondary regions // success before the primary region. - if err := bo.Backoff(boTxnNotFound, err); err != nil { - logutil.Logger(bo.ctx).Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) + if err := bo.Backoff(retry.BoTxnNotFound, err); err != nil { + logutil.Logger(bo.GetCtx()).Warn("getTxnStatusFromLock backoff fail", zap.Error(err)) } if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 { - logutil.Logger(bo.ctx).Warn("lock txn not found, lock has expired", + logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired", zap.Uint64("CallerStartTs", callerStartTS), zap.Stringer("lock str", l)) if l.LockType == kvrpcpb.Op_PessimisticLock { @@ -599,7 +600,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte return status, errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return status, errors.Trace(err) } @@ -735,7 +736,7 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys [] return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -866,7 +867,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *Backoffer, l *Lock, region Region return errors.Trace(err) } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -934,7 +935,7 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, li return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -985,7 +986,7 @@ func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegi return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -997,7 +998,7 @@ func (lr *LockResolver) resolvePessimisticLock(bo *Backoffer, l *Lock, cleanRegi cmdResp := resp.Resp.(*kvrpcpb.PessimisticRollbackResponse) if keyErr := cmdResp.GetErrors(); len(keyErr) > 0 { err = errors.Errorf("unexpected resolve pessimistic lock err: %s, lock: %v", keyErr[0], l) - logutil.Logger(bo.ctx).Error("resolveLock error", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Error("resolveLock error", zap.Error(err)) return err } return nil diff --git a/store/tikv/pessimistic.go b/store/tikv/pessimistic.go index 475efc7ad2a8b..445ced93ff904 100644 --- a/store/tikv/pessimistic.go +++ b/store/tikv/pessimistic.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -130,7 +131,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -228,7 +229,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Bac return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -247,11 +248,11 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *Backoffer, lockCtx *kv. for _, action := range strings.Split(v, ",") { if action == "delay" { duration := time.Duration(rand.Int63n(int64(time.Second) * 5)) - logutil.Logger(bo.ctx).Info("[failpoint] injected delay at pessimistic lock", + logutil.Logger(bo.GetCtx()).Info("[failpoint] injected delay at pessimistic lock", zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration)) time.Sleep(duration) } else if action == "fail" { - logutil.Logger(bo.ctx).Info("[failpoint] injected failure at pessimistic lock", + logutil.Logger(bo.GetCtx()).Info("[failpoint] injected failure at pessimistic lock", zap.Uint64("txnStartTS", c.startTS)) failpoint.Return(errors.New("injected failure at pessimistic lock")) } diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 1fea4ba467341..ad2c3716b264a 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -27,6 +27,7 @@ import ( tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -130,7 +131,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff failpoint.Inject("prewritePrimaryFail", func() { // Delay to avoid cancelling other normally ongoing prewrite requests. time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting primary batch", + logutil.Logger(bo.GetCtx()).Info("[failpoint] injected error on prewriting primary batch", zap.Uint64("txnStartTS", c.startTS)) failpoint.Return(errors.New("injected error on prewriting primary batch")) }) @@ -139,7 +140,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff failpoint.Inject("prewriteSecondaryFail", func() { // Delay to avoid cancelling other normally ongoing prewrite requests. time.Sleep(time.Millisecond * 50) - logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting secondary batch", + logutil.Logger(bo.GetCtx()).Info("[failpoint] injected error on prewriting secondary batch", zap.Uint64("txnStartTS", c.startTS)) failpoint.Return(errors.New("injected error on prewriting secondary batch")) }) @@ -150,7 +151,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". - if len(bo.errors) > 0 { + if bo.HasErrors() { txnSize = math.MaxUint64 } @@ -175,7 +176,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -203,7 +204,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff if prewriteResp.MinCommitTs != 0 { return errors.Trace(errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc")) } - logutil.Logger(bo.ctx).Warn("1pc failed and fallbacks to normal commit procedure", + logutil.Logger(bo.GetCtx()).Warn("1pc failed and fallbacks to normal commit procedure", zap.Uint64("startTS", c.startTS)) metrics.OnePCTxnCounterFallback.Inc() c.setOnePC(false) @@ -212,14 +213,14 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff // For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe // not to lock the mutex. if c.onePCCommitTS != 0 { - logutil.Logger(bo.ctx).Fatal("one pc happened multiple times", + logutil.Logger(bo.GetCtx()).Fatal("one pc happened multiple times", zap.Uint64("startTS", c.startTS)) } c.onePCCommitTS = prewriteResp.OnePcCommitTs } return nil } else if prewriteResp.OnePcCommitTs != 0 { - logutil.Logger(bo.ctx).Fatal("tikv committed a non-1pc transaction with 1pc protocol", + logutil.Logger(bo.GetCtx()).Fatal("tikv committed a non-1pc transaction with 1pc protocol", zap.Uint64("startTS", c.startTS)) } if c.isAsyncCommit() { @@ -230,7 +231,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff if c.testingKnobs.noFallBack { return nil } - logutil.Logger(bo.ctx).Warn("async commit cannot proceed since the returned minCommitTS is zero, "+ + logutil.Logger(bo.GetCtx()).Warn("async commit cannot proceed since the returned minCommitTS is zero, "+ "fallback to normal path", zap.Uint64("startTS", c.startTS)) c.setAsyncCommit(false) } else { @@ -268,7 +269,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start))) if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + err = bo.BackoffWithMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { return errors.Trace(err) } @@ -277,10 +278,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } func (c *twoPhaseCommitter) prewriteMutations(bo *Backoffer, mutations CommitterMutations) error { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context())) defer span1.Finish() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. diff --git a/store/tikv/range_task.go b/store/tikv/range_task.go index 1395fac0609a2..bc9b8fa9999c1 100644 --- a/store/tikv/range_task.go +++ b/store/tikv/range_task.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "go.uber.org/zap" ) @@ -91,6 +92,8 @@ func (s *RangeTaskRunner) SetRegionsPerTask(regionsPerTask int) { s.regionsPerTask = regionsPerTask } +const locateRegionMaxBackoff = 20000 + // RunOnRange runs the task on the given range. // Empty startKey or endKey means unbounded. func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey, endKey []byte) error { @@ -157,7 +160,7 @@ Loop: default: } - bo := NewBackofferWithVars(ctx, locateRegionMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, locateRegionMaxBackoff, nil) rangeEndKey, err := s.store.GetRegionCache().BatchLoadRegionsFromKey(bo, key, s.regionsPerTask) if err != nil { diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index 2a80d26917a20..b96828df187fd 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" ) @@ -110,6 +111,8 @@ func (c *RawKVClient) Get(key []byte) ([]byte, error) { return cmdResp.Value, nil } +const rawkvMaxBackoff = 20000 + // BatchGet queries values with the keys. func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) { start := time.Now() @@ -117,7 +120,7 @@ func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) { metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) }() - bo := NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet) if err != nil { return nil, errors.Trace(err) @@ -184,7 +187,7 @@ func (c *RawKVClient) BatchPut(keys, values [][]byte) error { return errors.New("empty value is not supported") } } - bo := NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) err := c.sendBatchPut(bo, keys, values) return errors.Trace(err) } @@ -218,7 +221,7 @@ func (c *RawKVClient) BatchDelete(keys [][]byte) error { metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds()) }() - bo := NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete) if err != nil { return errors.Trace(err) @@ -350,7 +353,7 @@ func (c *RawKVClient) ReverseScan(startKey, endKey []byte, limit int) (keys [][] } func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *KeyLocation, error) { - bo := NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) sender := NewRegionRequestSender(c.regionCache, c.rpcClient) for { var loc *KeyLocation @@ -372,7 +375,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (* return nil, nil, errors.Trace(err) } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return nil, nil, errors.Trace(err) } @@ -456,7 +459,7 @@ func (c *RawKVClient) doBatchReq(bo *Backoffer, batch batch, cmdType tikvrpc.Cmd return batchResp } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { batchResp.err = errors.Trace(err) return batchResp @@ -490,7 +493,7 @@ func (c *RawKVClient) doBatchReq(bo *Backoffer, batch batch, cmdType tikvrpc.Cmd // We can't use sendReq directly, because we need to know the end of the region before we send the request // TODO: Is there any better way to avoid duplicating code with func `sendReq` ? func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) { - bo := NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) sender := NewRegionRequestSender(c.regionCache, c.rpcClient) for { loc, err := c.regionCache.LocateKey(bo, startKey) @@ -517,7 +520,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvr return nil, nil, errors.Trace(err) } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return nil, nil, errors.Trace(err) } @@ -622,7 +625,7 @@ func (c *RawKVClient) doBatchPut(bo *Backoffer, batch batch) error { return errors.Trace(err) } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index a73684fdf49c5..2528145675e0c 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -35,13 +35,14 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" atomic2 "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/singleflight" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" + gbackoff "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" @@ -194,7 +195,7 @@ func (r *Region) init(c *RegionCache) error { if !exists { store = c.getStoreByStoreID(p.StoreId) } - _, err := store.initResolve(NewNoopBackoff(context.Background()), c) + _, err := store.initResolve(retry.NewNoopBackoff(context.Background()), c) if err != nil { return err } @@ -425,7 +426,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe } if cachedRegion.checkNeedReload() { - // TODO: This may cause a fake EpochNotMatch error, and reload the region after a backoff. It's better to reload + // TODO: This may cause a fake EpochNotMatch error, and reload the region after a retry. It's better to reload // the region directly here. return nil, nil } @@ -644,7 +645,7 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) // no region data, return error if failure. return nil, err } - logutil.Eventf(bo.ctx, "load region %d from pd, due to cache-miss", lr.GetID()) + logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() c.insertRegionToCache(r) @@ -654,10 +655,10 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) lr, err := c.loadRegion(bo, key, isEndKey) if err != nil { // ignore error and use old region info. - logutil.Logger(bo.ctx).Error("load region failure", + logutil.Logger(bo.GetCtx()).Error("load region failure", zap.ByteString("key", key), zap.Error(err)) } else { - logutil.Eventf(bo.ctx, "load region %d from pd, due to need-reload", lr.GetID()) + logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() c.insertRegionToCache(r) @@ -674,7 +675,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload if r != nil { peersNum := len(r.meta.Peers) if len(ctx.Meta.Peers) != peersNum { - logutil.Logger(bo.ctx).Info("retry and refresh current ctx after send request fail and up/down stores length changed", + logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed", zap.Stringer("current", ctx), zap.Bool("needReload", scheduleReload), zap.Reflect("oldPeers", ctx.Meta.Peers), @@ -727,20 +728,20 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload // In case the epoch of the store is increased, try to avoid reloading the current region by also // increasing the epoch stored in `rs`. rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx) - logutil.Logger(bo.ctx).Info("switch region proxy peer to next due to send request fail", + logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail", zap.Stringer("current", ctx), zap.Bool("needReload", scheduleReload), zap.Error(err)) } else { rs.switchNextTiKVPeer(r, ctx.AccessIdx) - logutil.Logger(bo.ctx).Info("switch region peer to next due to send request fail", + logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail", zap.Stringer("current", ctx), zap.Bool("needReload", scheduleReload), zap.Error(err)) } } else { rs.switchNextFlashPeer(r, ctx.AccessIdx) - logutil.Logger(bo.ctx).Info("switch region tiflash peer to next due to send request fail", + logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", zap.Stringer("current", ctx), zap.Bool("needReload", scheduleReload), zap.Error(err)) @@ -763,7 +764,7 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca lr, err := c.loadRegionByID(bo, regionID) if err != nil { // ignore error and use old region info. - logutil.Logger(bo.ctx).Error("load region failure", + logutil.Logger(bo.GetCtx()).Error("load region failure", zap.Uint64("regionID", regionID), zap.Error(err)) } else { r = lr @@ -1125,7 +1126,7 @@ func filterUnavailablePeers(region *pd.Region) { // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Region, error) { - ctx := bo.ctx + ctx := bo.GetCtx() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1136,7 +1137,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg searchPrev := false for { if backoffErr != nil { - err := bo.Backoff(BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRPC, backoffErr) if err != nil { return nil, errors.Trace(err) } @@ -1183,7 +1184,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg // loadRegionByID loads region from pd client, and picks the first peer as leader. func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, error) { - ctx := bo.ctx + ctx := bo.GetCtx() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("loadRegionByID", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1192,7 +1193,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e var backoffErr error for { if backoffErr != nil { - err := bo.Backoff(BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRPC, backoffErr) if err != nil { return nil, errors.Trace(err) } @@ -1232,7 +1233,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit if limit == 0 { return nil, nil } - ctx := bo.ctx + ctx := bo.GetCtx() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("scanRegions", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1242,7 +1243,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit var backoffErr error for { if backoffErr != nil { - err := bo.Backoff(BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRPC, backoffErr) if err != nil { return nil, errors.Trace(err) } @@ -1405,14 +1406,14 @@ func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error { - // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. + // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, retry. for _, meta := range currentRegions { if meta.GetId() == ctx.Region.id && (meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer || meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) { err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions) logutil.BgLogger().Info("region epoch is ahead of tikv", zap.Error(err)) - return bo.Backoff(BoRegionMiss, err) + return bo.Backoff(retry.BoRegionMiss, err) } } @@ -1775,7 +1776,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err } var store *metapb.Store for { - store, err = c.pdClient.GetStore(bo.ctx, s.storeID) + store, err = c.pdClient.GetStore(bo.GetCtx(), s.storeID) if err != nil { metrics.RegionCacheCounterWithGetStoreError.Inc() } else { @@ -1787,7 +1788,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err return } err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) - if err = bo.Backoff(BoPDRPC, err); err != nil { + if err = bo.Backoff(retry.BoPDRPC, err); err != nil { return } continue @@ -1990,7 +1991,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } - bo := NewNoopBackoff(ctx) + bo := retry.NewNoopBackoff(ctx) l := s.requestLiveness(bo, c) if l == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) @@ -2020,7 +2021,7 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) }) var ctx context.Context if bo != nil { - ctx = bo.ctx + ctx = bo.GetCtx() } else { ctx = context.Background() } @@ -2110,7 +2111,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h grpc.WithInitialWindowSize(grpcInitialWindowSize), grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ + Backoff: gbackoff.Config{ BaseDelay: 100 * time.Millisecond, // Default was 1s. Multiplier: 1.6, // Default Jitter: 0.2, // Default diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index efb2ae9df73ab..7b0e5884abb2a 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/retry" pd "github.com/tikv/pd/client" ) @@ -310,7 +311,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, seed) c.Assert(err, IsNil) c.Assert(ctx.Addr, Equals, "store2") - s.cache.OnSendFail(NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) + s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) s.cache.checkAndResolve(nil) s.cache.UpdateLeader(loc.Region, s.store2, 0) addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) @@ -870,7 +871,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { c.Assert(err, IsNil) err = cache.OnRegionEpochNotMatch(bo, &RPCContext{Region: region.VerID()}, []*metapb.Region{&r2}) c.Assert(err, IsNil) - c.Assert(len(bo.errors), Equals, 2) + c.Assert(bo.ErrorsNum(), Equals, 2) } func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { @@ -1329,7 +1330,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange(c *C) { s.cache.insertRegionToCache(region) // OnSendFail should not panic - s.cache.OnSendFail(NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) + s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) } func createSampleRegion(startKey, endKey []byte) *Region { diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index cad0ed0379e96..646053c681aaa 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" ) @@ -223,12 +224,12 @@ func (s *RegionRequestSender) SendReqCtx( rpcCtx *RPCContext, err error, ) { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) defer span1.Finish() // TODO(MyonKeminta): Make sure trace works without cloning the backoffer. // bo = bo.Clone() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } failpoint.Inject("tikvStoreSendReqResult", func(val failpoint.Value) { @@ -248,9 +249,7 @@ func (s *RegionRequestSender) SendReqCtx( }, nil, nil) } case "callBackofferHook": - if bo.vars != nil && bo.vars.Hook != nil { - bo.vars.Hook("callBackofferHook", bo.vars) - } + bo.SetVarsHook("callBackofferHook", bo.GetVars()) case "requestTiDBStoreError": if et == tikvrpc.TiDB { failpoint.Return(nil, nil, tikverr.ErrTiKVServerTimeout) @@ -265,7 +264,7 @@ func (s *RegionRequestSender) SendReqCtx( tryTimes := 0 for { if (tryTimes > 0) && (tryTimes%1000 == 0) { - logutil.Logger(bo.ctx).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes)) + logutil.Logger(bo.GetCtx()).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes)) } rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...) @@ -275,7 +274,7 @@ func (s *RegionRequestSender) SendReqCtx( failpoint.Inject("invalidCacheAndRetry", func() { // cooperate with github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff - if c := bo.ctx.Value("injectedBackoff"); c != nil { + if c := bo.GetCtx().Value("injectedBackoff"); c != nil { resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) failpoint.Return(resp, nil, err) } @@ -287,12 +286,12 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - logutil.Logger(bo.ctx).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", ®ionID)) + logutil.Logger(bo.GetCtx()).Debug("throwing pseudo region error due to region not found in cache", zap.Stringer("region", ®ionID)) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } - logutil.Eventf(bo.ctx, "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) + logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) s.storeAddr = rpcCtx.Addr var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) @@ -301,7 +300,8 @@ func (s *RegionRequestSender) SendReqCtx( } // recheck whether the session/query is killed during the Next() - if bo.vars != nil && bo.vars.Killed != nil && atomic.LoadUint32(bo.vars.Killed) == 1 { + boVars := bo.GetVars() + if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 { return nil, nil, tikverr.ErrQueryInterrupted } failpoint.Inject("mockRetrySendReqToRegion", func(val failpoint.Value) { @@ -392,7 +392,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, defer s.releaseStoreToken(rpcCtx.Store) } - ctx := bo.ctx + ctx := bo.GetCtx() if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { var cancel context.CancelFunc ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) @@ -410,7 +410,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, } var sessionID uint64 - if v := bo.ctx.Value(util.SessionID); v != nil { + if v := bo.GetCtx().Value(util.SessionID); v != nil { sessionID = v.(uint64) } @@ -443,7 +443,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) { if val.(bool) { - if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 { + if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { failpoint.Return(&tikvrpc.Response{ Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}}, }, false, nil) @@ -545,12 +545,12 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { } func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() // TODO(MyonKeminta): Make sure trace works without cloning the backoffer. // bo = bo.Clone() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { @@ -560,7 +560,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } if status.Code(errors.Cause(err)) == codes.Canceled { select { - case <-bo.ctx.Done(): + case <-bo.GetCtx().Done(): return errors.Trace(err) default: // If we don't cancel, but the error code is Canceled, it must be from grpc remote. @@ -579,9 +579,9 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { - err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) + err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { - err = bo.Backoff(BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) + err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) } return errors.Trace(err) } @@ -633,13 +633,13 @@ func regionErrorToLabel(e *errorpb.Error) string { return "unknown" } -func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (retry bool, err error) { - if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil { +func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) defer span1.Finish() // TODO(MyonKeminta): Make sure trace works without cloning the backoffer. // bo = bo.Clone() - bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1) + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() @@ -655,7 +655,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed // isolated and removed from the Raft group. So it's necessary to reload // the region from PD. s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader) - if err = bo.Backoff(BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil { + if err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil { return false, errors.Trace(err) } } else { @@ -691,9 +691,9 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) if ctx != nil && ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { - err = bo.Backoff(boTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) + err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } else { - err = bo.Backoff(boTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) + err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } if err != nil { return false, errors.Trace(err) @@ -702,7 +702,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed } if regionErr.GetStaleCommand() != nil { logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) - err = bo.Backoff(boStaleCmd, errors.Errorf("stale command, ctx: %v", ctx)) + err = bo.Backoff(retry.BoStaleCmd, errors.Errorf("stale command, ctx: %v", ctx)) if err != nil { return false, errors.Trace(err) } @@ -719,7 +719,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed } if regionErr.GetMaxTimestampNotSynced() != nil { logutil.BgLogger().Warn("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx)) - err = bo.Backoff(boMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx)) + err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx)) if err != nil { return false, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 81e9cc4498a07..5e7752b97fccb 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" @@ -71,7 +72,7 @@ func (s *testRegionRequestToSingleStoreSuite) SetUpTest(c *C) { s.store, s.peer, s.region = mocktikv.BootstrapWithSingleStore(s.cluster) pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster)} s.cache = NewRegionCache(pdCli) - s.bo = NewNoopBackoff(context.Background()) + s.bo = retry.NewNoopBackoff(context.Background()) s.mvccStore = mocktikv.MustNewMVCCStore() client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) @@ -82,7 +83,7 @@ func (s *testRegionRequestToThreeStoresSuite) SetUpTest(c *C) { s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3) pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster)} s.cache = NewRegionCache(pdCli) - s.bo = NewNoopBackoff(context.Background()) + s.bo = retry.NewNoopBackoff(context.Background()) s.mvccStore = mocktikv.MustNewMVCCStore() client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go new file mode 100644 index 0000000000000..b2e0137902bce --- /dev/null +++ b/store/tikv/retry/backoff.go @@ -0,0 +1,439 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "fmt" + "math" + "math/rand" + "strings" + "sync/atomic" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/log" + tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/store/tikv/logutil" + "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/util" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + // NoJitter makes the backoff sequence strict exponential. + NoJitter = 1 + iota + // FullJitter applies random factors to strict exponential. + FullJitter + // EqualJitter is also randomized, but prevents very short sleeps. + EqualJitter + // DecorrJitter increases the maximum jitter based on the last random value. + DecorrJitter +) + +func (t BackoffType) metric() prometheus.Observer { + switch t { + // TODO: distinguish tikv and tiflash in metrics + case BoTiKVRPC, BoTiFlashRPC: + return metrics.BackoffHistogramRPC + case BoTxnLock: + return metrics.BackoffHistogramLock + case BoTxnLockFast: + return metrics.BackoffHistogramLockFast + case BoPDRPC: + return metrics.BackoffHistogramPD + case BoRegionMiss: + return metrics.BackoffHistogramRegionMiss + case BoTiKVServerBusy, BoTiFlashServerBusy: + return metrics.BackoffHistogramServerBusy + case BoStaleCmd: + return metrics.BackoffHistogramStaleCmd + } + return metrics.BackoffHistogramEmpty +} + +// NewBackoffFn creates a backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int { + if base < 2 { + // Top prevent panic in 'rand.Intn'. + base = 2 + } + attempts := 0 + lastSleep := base + return func(ctx context.Context, maxSleepMs int) int { + var sleep int + switch jitter { + case NoJitter: + sleep = expo(base, cap, attempts) + case FullJitter: + v := expo(base, cap, attempts) + sleep = rand.Intn(v) + case EqualJitter: + v := expo(base, cap, attempts) + sleep = v/2 + rand.Intn(v/2) + case DecorrJitter: + sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) + } + logutil.BgLogger().Debug("backoff", + zap.Int("base", base), + zap.Int("sleep", sleep), + zap.Int("attempts", attempts)) + + realSleep := sleep + // when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds. + if maxSleepMs >= 0 && realSleep > maxSleepMs { + realSleep = maxSleepMs + } + select { + case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return realSleep + case <-ctx.Done(): + return 0 + } + } +} + +func expo(base, cap, n int) int { + return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) +} + +// BackoffType defines the backoff type. +type BackoffType int + +// Back off types. +const ( + BoTiKVRPC BackoffType = iota + BoTiFlashRPC + BoTxnLock + BoTxnLockFast + BoPDRPC + BoRegionMiss + BoTiKVServerBusy + BoTiFlashServerBusy + BoTxnNotFound + BoStaleCmd + BoMaxTsNotSynced +) + +func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int { + if vars.Hook != nil { + vars.Hook(t.String(), vars) + } + switch t { + case BoTiKVRPC, BoTiFlashRPC: + return NewBackoffFn(100, 2000, EqualJitter) + case BoTxnLock: + return NewBackoffFn(200, 3000, EqualJitter) + case BoTxnLockFast: + return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter) + case BoPDRPC: + return NewBackoffFn(500, 3000, EqualJitter) + case BoRegionMiss: + // change base time to 2ms, because it may recover soon. + return NewBackoffFn(2, 500, NoJitter) + case BoTxnNotFound: + return NewBackoffFn(2, 500, NoJitter) + case BoTiKVServerBusy, BoTiFlashServerBusy: + return NewBackoffFn(2000, 10000, EqualJitter) + case BoStaleCmd: + return NewBackoffFn(2, 1000, NoJitter) + case BoMaxTsNotSynced: + return NewBackoffFn(2, 500, NoJitter) + } + return nil +} + +func (t BackoffType) String() string { + switch t { + case BoTiKVRPC: + return "tikvRPC" + case BoTiFlashRPC: + return "tiflashRPC" + case BoTxnLock: + return "txnLock" + case BoTxnLockFast: + return "txnLockFast" + case BoPDRPC: + return "pdRPC" + case BoRegionMiss: + return "regionMiss" + case BoTiKVServerBusy: + return "tikvServerBusy" + case BoTiFlashServerBusy: + return "tiflashServerBusy" + case BoStaleCmd: + return "staleCommand" + case BoTxnNotFound: + return "txnNotFound" + case BoMaxTsNotSynced: + return "maxTsNotSynced" + } + return "" +} + +// TError returns pingcap/error of the backoff type. +func (t BackoffType) TError() error { + switch t { + case BoTiKVRPC: + return tikverr.ErrTiKVServerTimeout + case BoTiFlashRPC: + return tikverr.ErrTiFlashServerTimeout + case BoTxnLock, BoTxnLockFast, BoTxnNotFound: + return tikverr.ErrResolveLockTimeout + case BoPDRPC: + return tikverr.NewErrPDServerTimeout("") + case BoRegionMiss: + return tikverr.ErrRegionUnavailable + case BoTiKVServerBusy: + return tikverr.ErrTiKVServerBusy + case BoTiFlashServerBusy: + return tikverr.ErrTiFlashServerBusy + case BoStaleCmd: + return tikverr.ErrTiKVStaleCommand + case BoMaxTsNotSynced: + return tikverr.ErrTiKVMaxTimestampNotSynced + } + return tikverr.ErrUnknown +} + +// Backoffer is a utility for retrying queries. +type Backoffer struct { + ctx context.Context + + fn map[BackoffType]func(context.Context, int) int + maxSleep int + totalSleep int + errors []error + types []fmt.Stringer + vars *kv.Variables + noop bool + + backoffSleepMS map[BackoffType]int + backoffTimes map[BackoffType]int +} + +type txnStartCtxKeyType struct{} + +// TxnStartKey is a key for transaction start_ts info in context.Context. +var TxnStartKey interface{} = txnStartCtxKeyType{} + +// NewBackoffer (Deprecated) creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + return &Backoffer{ + ctx: ctx, + maxSleep: maxSleep, + vars: kv.DefaultVars, + } +} + +// NewBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables. +func NewBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *Backoffer { + return NewBackoffer(ctx, maxSleep).withVars(vars) +} + +// NewNoopBackoff create a Backoffer do nothing just return error directly +func NewNoopBackoff(ctx context.Context) *Backoffer { + return &Backoffer{ctx: ctx, noop: true} +} + +// withVars sets the kv.Variables to the Backoffer and return it. +func (b *Backoffer) withVars(vars *kv.Variables) *Backoffer { + if vars != nil { + b.vars = vars + } + // maxSleep is the max sleep time in millisecond. + // When it is multiplied by BackOffWeight, it should not be greater than MaxInt32. + if math.MaxInt32/b.vars.BackOffWeight >= b.maxSleep { + b.maxSleep *= b.vars.BackOffWeight + } + return b +} + +// Backoff sleeps a while base on the backoffType and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(typ BackoffType, err error) error { + if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", typ), opentracing.ChildOf(span.Context())) + defer span1.Finish() + opentracing.ContextWithSpan(b.ctx, span1) + } + return b.BackoffWithMaxSleep(typ, -1, err) +} + +// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error { + if strings.Contains(err.Error(), tikverr.MismatchClusterID) { + logutil.BgLogger().Fatal("critical error", zap.Error(err)) + } + select { + case <-b.ctx.Done(): + return errors.Trace(err) + default: + } + + b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) + b.types = append(b.types, typ) + if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) { + errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", typ.String(), b.maxSleep) + for i, err := range b.errors { + // Print only last 3 errors for non-DEBUG log levels. + if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 { + errMsg += "\n" + err.Error() + } + } + logutil.BgLogger().Warn(errMsg) + // Use the first backoff type to generate a MySQL error. + return b.types[0].(BackoffType).TError() + } + + // Lazy initialize. + if b.fn == nil { + b.fn = make(map[BackoffType]func(context.Context, int) int) + } + f, ok := b.fn[typ] + if !ok { + f = typ.createFn(b.vars) + b.fn[typ] = f + } + + realSleep := f(b.ctx, maxSleepMs) + typ.metric().Observe(float64(realSleep) / 1000) + b.totalSleep += realSleep + if b.backoffSleepMS == nil { + b.backoffSleepMS = make(map[BackoffType]int) + } + b.backoffSleepMS[typ] += realSleep + if b.backoffTimes == nil { + b.backoffTimes = make(map[BackoffType]int) + } + b.backoffTimes[typ]++ + + stmtExec := b.ctx.Value(util.ExecDetailsKey) + if stmtExec != nil { + detail := stmtExec.(*util.ExecDetails) + atomic.AddInt64(&detail.BackoffDuration, int64(realSleep)*int64(time.Millisecond)) + atomic.AddInt64(&detail.BackoffCount, 1) + } + + if b.vars != nil && b.vars.Killed != nil { + if atomic.LoadUint32(b.vars.Killed) == 1 { + return tikverr.ErrQueryInterrupted + } + } + + var startTs interface{} + if ts := b.ctx.Value(TxnStartKey); ts != nil { + startTs = ts + } + logutil.Logger(b.ctx).Debug("retry later", + zap.Error(err), + zap.Int("totalSleep", b.totalSleep), + zap.Int("maxSleep", b.maxSleep), + zap.Stringer("type", typ), + zap.Reflect("txnStartTS", startTs)) + return nil +} + +func (b *Backoffer) String() string { + if b.totalSleep == 0 { + return "" + } + return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.types) +} + +// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares +// current Backoffer's context. +func (b *Backoffer) Clone() *Backoffer { + return &Backoffer{ + ctx: b.ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + errors: b.errors, + vars: b.vars, + } +} + +// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds +// a child context of current Backoffer's context. +func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) { + ctx, cancel := context.WithCancel(b.ctx) + return &Backoffer{ + ctx: ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + errors: b.errors, + vars: b.vars, + }, cancel +} + +// GetVars returns the binded vars. +func (b *Backoffer) GetVars() *kv.Variables { + return b.vars +} + +// GetTotalSleep returns total sleep time. +func (b *Backoffer) GetTotalSleep() int { + return b.totalSleep +} + +// GetTypes returns type list. +func (b *Backoffer) GetTypes() []fmt.Stringer { + return b.types +} + +// GetCtx returns the binded context. +func (b *Backoffer) GetCtx() context.Context { + return b.ctx +} + +// SetCtx sets the binded context to ctx. +func (b *Backoffer) SetCtx(ctx context.Context) { + b.ctx = ctx +} + +// GetBackoffTimes returns a map contains backoff time count by type. +func (b *Backoffer) GetBackoffTimes() map[BackoffType]int { + return b.backoffTimes +} + +// GetBackoffSleepMS returns a map contains backoff sleep time by type. +func (b *Backoffer) GetBackoffSleepMS() map[BackoffType]int { + return b.backoffSleepMS +} + +// HasErrors returns true if b.errors is not empty. +func (b *Backoffer) HasErrors() bool { + return len(b.errors) != 0 +} + +// ErrorsNum returns the number of errors. +func (b *Backoffer) ErrorsNum() int { + return len(b.errors) +} + +// SetVarsHook sets the vars.Hook is used for test to verify the variable take effect. +func (b *Backoffer) SetVarsHook(name string, vars *kv.Variables) { + if b.vars != nil && b.vars.Hook != nil { + b.vars.Hook(name, vars) + } +} diff --git a/store/tikv/backoff_test.go b/store/tikv/retry/backoff_test.go similarity index 98% rename from store/tikv/backoff_test.go rename to store/tikv/retry/backoff_test.go index 11254937abd72..f8dfb9ed120f3 100644 --- a/store/tikv/backoff_test.go +++ b/store/tikv/retry/backoff_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package retry import ( "context" diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 19a14b3f73819..6c43b7bdee7cd 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -22,6 +22,7 @@ import ( tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "go.uber.org/zap" ) @@ -85,9 +86,11 @@ func (s *Scanner) Value() []byte { return nil } +const scannerNextMaxBackoff = 20000 + // Next return next element. func (s *Scanner) Next() error { - bo := NewBackofferWithVars(context.WithValue(context.Background(), TxnStartKey, s.snapshot.version), scannerNextMaxBackoff, s.snapshot.vars) + bo := retry.NewBackofferWithVars(context.WithValue(context.Background(), retry.TxnStartKey, s.snapshot.version), scannerNextMaxBackoff, s.snapshot.vars) if !s.valid { return errors.New("scanner iterator is invalid") } @@ -223,7 +226,7 @@ func (s *Scanner) getData(bo *Backoffer) error { if regionErr != nil { logutil.BgLogger().Debug("scanner getData failed", zap.Stringer("regionErr", regionErr)) - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -251,7 +254,7 @@ func (s *Scanner) getData(bo *Backoffer) error { return errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(BoTxnLockFast, int(msBeforeExpired), errors.Errorf("key is locked during scanning")) + err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.Errorf("key is locked during scanning")) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2b9926c7a2b9a..efcb5464891a0 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" "github.com/pingcap/tidb/store/tikv/util" @@ -126,6 +127,8 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnaps } } +const batchGetMaxBackoff = 20000 + // SetSnapshotTS resets the timestamp for reads. func (s *KVSnapshot) SetSnapshotTS(ts uint64) { // Sanity check for snapshot version. @@ -170,8 +173,8 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] // We want [][]byte instead of []kv.Key, use some magic to save memory. bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys)) - ctx = context.WithValue(ctx, TxnStartKey, s.version) - bo := NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) + ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) + bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) // Create a map to collect key-values from region servers. var mu sync.Mutex @@ -331,7 +334,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec return errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } @@ -378,7 +381,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec return errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(BoTxnLockFast, int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) + err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) if err != nil { return errors.Trace(err) } @@ -394,6 +397,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec } } +const getMaxBackoff = 20000 + // Get gets the value for key k from snapshot. func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { @@ -401,8 +406,8 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { metrics.TxnCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }(time.Now()) - ctx = context.WithValue(ctx, TxnStartKey, s.version) - bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars) + ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) + bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) val, err := s.get(ctx, bo, k) s.recordBackoffInfo(bo) if err != nil { @@ -436,7 +441,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, opentracing.ContextWithSpan(ctx, span1) } failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { - if bo.ctx.Value("TestSnapshotCache") != nil { + if bo.GetCtx().Value("TestSnapshotCache") != nil { panic("cache miss") } }) @@ -488,7 +493,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, return nil, errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { return nil, errors.Trace(err) } @@ -522,7 +527,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, return nil, errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(BoTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String())) + err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String())) if err != nil { return nil, errors.Trace(err) } @@ -679,7 +684,7 @@ func extractKeyErr(keyErr *pb.KeyError) error { func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) { s.mu.RLock() - if s.mu.stats == nil || bo.totalSleep == 0 { + if s.mu.stats == nil || bo.GetTotalSleep() == 0 { s.mu.RUnlock() return } @@ -690,14 +695,14 @@ func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) { return } if s.mu.stats.backoffSleepMS == nil { - s.mu.stats.backoffSleepMS = bo.backoffSleepMS - s.mu.stats.backoffTimes = bo.backoffTimes + s.mu.stats.backoffSleepMS = bo.GetBackoffSleepMS() + s.mu.stats.backoffTimes = bo.GetBackoffTimes() return } - for k, v := range bo.backoffSleepMS { + for k, v := range bo.GetBackoffSleepMS() { s.mu.stats.backoffSleepMS[k] += v } - for k, v := range bo.backoffTimes { + for k, v := range bo.GetBackoffTimes() { s.mu.stats.backoffTimes[k] += v } } @@ -726,8 +731,8 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRunti // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { rpcStats RegionRequestRuntimeStats - backoffSleepMS map[BackoffType]int - backoffTimes map[BackoffType]int + backoffSleepMS map[retry.BackoffType]int + backoffTimes map[retry.BackoffType]int scanDetail *util.ScanDetail timeDetail *util.TimeDetail } @@ -741,8 +746,8 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { } } if len(rs.backoffSleepMS) > 0 { - newRs.backoffSleepMS = make(map[BackoffType]int) - newRs.backoffTimes = make(map[BackoffType]int) + newRs.backoffSleepMS = make(map[retry.BackoffType]int) + newRs.backoffTimes = make(map[retry.BackoffType]int) for k, v := range rs.backoffSleepMS { newRs.backoffSleepMS[k] += v } @@ -763,10 +768,10 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { } if len(other.backoffSleepMS) > 0 { if rs.backoffSleepMS == nil { - rs.backoffSleepMS = make(map[BackoffType]int) + rs.backoffSleepMS = make(map[retry.BackoffType]int) } if rs.backoffTimes == nil { - rs.backoffTimes = make(map[BackoffType]int) + rs.backoffTimes = make(map[retry.BackoffType]int) } for k, v := range other.backoffSleepMS { rs.backoffSleepMS[k] += v diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 5839aa4d73c96..38ce24917d1cf 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -27,6 +27,7 @@ import ( tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" pd "github.com/tikv/pd/client" @@ -56,7 +57,7 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo return nil, nil } // The first time it enters this function. - if bo.totalSleep == 0 { + if bo.GetTotalSleep() == 0 { logutil.BgLogger().Info("split batch regions request", zap.Int("split key count", len(keys)), zap.Int("batch count", len(batches)), @@ -76,8 +77,8 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo util.WithRecovery(func() { select { case ch <- s.batchSendSingleRegion(backoffer, b, scatter, tableID): - case <-bo.ctx.Done(): - ch <- singleBatchResp{err: bo.ctx.Err()} + case <-bo.GetCtx().Done(): + ch <- singleBatchResp{err: bo.GetCtx().Err()} } }, func(r interface{}) { if r != nil { @@ -110,8 +111,8 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool, tableID *int64) singleBatchResp { if val, err := util.MockSplitRegionTimeout.Eval(); err == nil { if val.(bool) { - if _, ok := bo.ctx.Deadline(); ok { - <-bo.ctx.Done() + if _, ok := bo.GetCtx().Deadline(); ok { + <-bo.GetCtx().Done() } } } @@ -136,7 +137,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool return batchResp } if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { batchResp.err = errors.Trace(err) return batchResp @@ -192,9 +193,14 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool return batchResp } +const ( + splitRegionBackoff = 20000 + maxSplitRegionsBackoff = 120000 +) + // SplitRegions splits regions by splitKeys. func (s *KVStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool, tableID *int64) (regionIDs []uint64, err error) { - bo := NewBackofferWithVars(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff)), nil) + bo := retry.NewBackofferWithVars(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff)), nil) resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter, tableID) regionIDs = make([]uint64, 0, len(splitKeys)) if resp != nil && resp.Resp != nil { @@ -215,7 +221,7 @@ func (s *KVStore) scatterRegion(bo *Backoffer, regionID uint64, tableID *int64) if tableID != nil { opts = append(opts, pd.WithGroup(fmt.Sprintf("%v", *tableID))) } - _, err := s.pdClient.ScatterRegions(bo.ctx, []uint64{regionID}, opts...) + _, err := s.pdClient.ScatterRegions(bo.GetCtx(), []uint64{regionID}, opts...) if val, err2 := util.MockScatterRegionTimeout.Eval(); err2 == nil { if val.(bool) { @@ -226,7 +232,7 @@ func (s *KVStore) scatterRegion(bo *Backoffer, regionID uint64, tableID *int64) if err == nil { break } - err = bo.Backoff(BoPDRPC, errors.New(err.Error())) + err = bo.Backoff(retry.BoPDRPC, errors.New(err.Error())) if err != nil { return errors.Trace(err) } @@ -273,6 +279,8 @@ func (s *KVStore) preSplitRegion(ctx context.Context, group groupedMutations) bo return true } +const waitScatterRegionFinishBackoff = 120000 + // WaitScatterRegionFinish implements SplittableStore interface. // backOff is the back off time of the wait scatter region.(Milliseconds) // if backOff <= 0, the default wait scatter back off time will be used. @@ -283,7 +291,7 @@ func (s *KVStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64, logutil.BgLogger().Info("wait scatter region", zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff)) - bo := NewBackofferWithVars(ctx, backOff, nil) + bo := retry.NewBackofferWithVars(ctx, backOff, nil) logFreq := 0 for { resp, err := s.pdClient.GetOperator(ctx, regionID) @@ -310,9 +318,9 @@ func (s *KVStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64, logFreq++ } if err != nil { - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(err.Error())) } else { - err = bo.Backoff(BoRegionMiss, errors.New("wait scatter region timeout")) + err = bo.Backoff(retry.BoRegionMiss, errors.New("wait scatter region timeout")) } if err != nil { return errors.Trace(err) @@ -322,7 +330,7 @@ func (s *KVStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64, // CheckRegionInScattering uses to check whether scatter region finished. func (s *KVStore) CheckRegionInScattering(regionID uint64) (bool, error) { - bo := NewBackofferWithVars(context.Background(), locateRegionMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), locateRegionMaxBackoff, nil) for { resp, err := s.pdClient.GetOperator(context.Background(), regionID) if err == nil && resp != nil { @@ -331,7 +339,7 @@ func (s *KVStore) CheckRegionInScattering(regionID uint64) (bool, error) { } } if err != nil { - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) + err = bo.Backoff(retry.BoRegionMiss, errors.New(err.Error())) } else { return true, nil } diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index a6a4f8d826655..1a8dc5062218d 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" pd "github.com/tikv/pd/client" @@ -66,7 +67,7 @@ func (s StoreProbe) ClearTxnLatches() { // SendTxnHeartbeat renews a txn's ttl. func (s StoreProbe) SendTxnHeartbeat(ctx context.Context, key []byte, startTS uint64, ttl uint64) (uint64, error) { - bo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil) return sendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl) } @@ -266,12 +267,12 @@ func (c CommitterProbe) PrewriteAllMutations(ctx context.Context) error { // PrewriteMutations performs the first phase of commit for given keys. func (c CommitterProbe) PrewriteMutations(ctx context.Context, mutations CommitterMutations) error { - return c.prewriteMutations(NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), mutations) + return c.prewriteMutations(retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), mutations) } // CommitMutations performs the second phase of commit. func (c CommitterProbe) CommitMutations(ctx context.Context) error { - return c.commitMutations(NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), c.mutationsOfKeys([][]byte{c.primaryKey})) + return c.commitMutations(retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), c.mutationsOfKeys([][]byte{c.primaryKey})) } // MutationsOfKeys returns mutations match the keys. @@ -281,7 +282,7 @@ func (c CommitterProbe) MutationsOfKeys(keys [][]byte) CommitterMutations { // PessimisticRollbackMutations rolls mutations back. func (c CommitterProbe) PessimisticRollbackMutations(ctx context.Context, muts CommitterMutations) error { - return c.pessimisticRollbackMutations(NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, nil), muts) + return c.pessimisticRollbackMutations(retry.NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, nil), muts) } // Cleanup cleans dirty data of a committer. @@ -366,7 +367,7 @@ func (c CommitterProbe) SetPrimaryKeyBlocker(ac, bk chan struct{}) { // CleanupMutations performs the clean up phase. func (c CommitterProbe) CleanupMutations(ctx context.Context) error { - bo := NewBackofferWithVars(ctx, cleanupMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, cleanupMaxBackoff, nil) return c.cleanupMutations(bo, c.mutations) } @@ -434,13 +435,13 @@ func (l LockResolverProbe) ResolveLockAsync(bo *Backoffer, lock *Lock, status Tx // ResolveLock resolves single lock. func (l LockResolverProbe) ResolveLock(ctx context.Context, lock *Lock) error { - bo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil) return l.resolveLock(bo, lock, TxnStatus{}, false, make(map[RegionVerID]struct{})) } // ResolvePessimisticLock resolves single pessimistic lock. func (l LockResolverProbe) ResolvePessimisticLock(ctx context.Context, lock *Lock) error { - bo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil) + bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil) return l.resolvePessimisticLock(bo, lock, make(map[RegionVerID]struct{})) } diff --git a/store/tikv/tests/lock_test.go b/store/tikv/tests/lock_test.go index 3c6c652d96041..f32991877fefd 100644 --- a/store/tikv/tests/lock_test.go +++ b/store/tikv/tests/lock_test.go @@ -493,7 +493,7 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) { c.Assert(msBeforeLockExpired, Greater, int64(0)) lr := s.store.NewLockResolver() - bo := tikv.NewBackofferWithVars(context.Background(), tikv.GcResolveLockMaxBackoff, nil) + bo := tikv.NewGcResolveLockMaxBackoffer(context.Background()) loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary) c.Assert(err, IsNil) // Check BatchResolveLocks resolve the lock even the ttl is not expired. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 4e462653c415c..122f850b74940 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/unionstore" "github.com/pingcap/tidb/store/tikv/util" "go.uber.org/zap" @@ -84,7 +85,7 @@ type KVTxn struct { } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { - bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) startTS, err := store.getTimestampWithRetry(bo, txnScope) if err != nil { return nil, errors.Trace(err) @@ -108,7 +109,7 @@ func newTiKVTxnWithStartTS(store *KVStore, txnScope string, startTS uint64, repl } func newTiKVTxnWithExactStaleness(store *KVStore, txnScope string, prevSec uint64) (*KVTxn, error) { - bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) + bo := retry.NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) startTS, err := store.getStalenessTimestamp(bo, txnScope, prevSec) if err != nil { return nil, errors.Trace(err) @@ -383,7 +384,7 @@ func (txn *KVTxn) rollbackPessimisticLocks() error { if txn.lockedCnt == 0 { return nil } - bo := NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) + bo := retry.NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) keys := txn.collectLockedKeys() return txn.committer.pessimisticRollbackMutations(bo, &PlainMutations{keys: keys}) } @@ -524,16 +525,16 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput . lockCtx.Stats = &util.LockKeysDetails{ LockKeys: int32(len(keys)), } - bo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars) + bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars) txn.committer.forUpdateTS = lockCtx.ForUpdateTS // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1 err = txn.committer.pessimisticLockMutations(bo, lockCtx, &PlainMutations{keys: keys}) - if bo.totalSleep > 0 { - atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.totalSleep)*int64(time.Millisecond)) + if bo.GetTotalSleep() > 0 { + atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond)) lockCtx.Stats.Mu.Lock() - lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.types...) + lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...) lockCtx.Stats.Mu.Unlock() } if lockCtx.Killed != nil { @@ -603,6 +604,8 @@ func deduplicateKeys(keys [][]byte) [][]byte { return deduped } +const pessimisticRollbackMaxBackoff = 20000 + func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup { // Clone a new committer for execute in background. committer := &twoPhaseCommitter{ @@ -631,7 +634,7 @@ func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) * } }) - err := committer.pessimisticRollbackMutations(NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, txn.vars), &PlainMutations{keys: keys}) + err := committer.pessimisticRollbackMutations(retry.NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, txn.vars), &PlainMutations{keys: keys}) if err != nil { logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err)) } From e77c20b4e91accf59429e2c1cf794176b13bfbb8 Mon Sep 17 00:00:00 2001 From: shirly Date: Tue, 11 May 2021 11:15:51 +0800 Subject: [PATCH 2/3] stpre/copr: fix tests caused by usage of tikv.TxnStartKey --- store/copr/batch_coprocessor.go | 4 ++-- store/copr/coprocessor.go | 11 +++++++---- store/copr/mpp.go | 4 ++-- store/gcworker/gc_worker_test.go | 5 +++-- store/tikv/backoff.go | 1 - store/tikv/client_batch.go | 2 +- store/tikv/region_request.go | 2 +- store/tikv/retry/backoff.go | 15 ++++++++++----- 8 files changed, 26 insertions(+), 18 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 5506c9d497ac2..ef3bcee5719ed 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -176,7 +176,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V if req.KeepOrder || req.Desc { return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")} } - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType) @@ -386,7 +386,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b return nil } - if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { + if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { return errors.Trace(err) } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index cb799edb16f70..f32146bea82ee 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -73,7 +73,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req) @@ -830,11 +830,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti return nil, nil } - boRPCType := tikv.BoTiKVRPC + err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task) if task.storeType == kv.TiFlash { - boRPCType = tikv.BoTiFlashRPC + err1 = bo.Backoff(tikv.BoTiFlashRPC, err1) + } else { + err1 = bo.b.BackoffTiKVRPC(err1) } - if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil { + + if err1 != nil { return nil, errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 3ea07e744f9b9..812638411fd3d 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -56,7 +56,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) { - ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS) + ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil @@ -344,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques return } - if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil { + if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil { if errors.Cause(err) == context.Canceled { logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) } else { diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 3bfd616929aec..bc09651e0d379 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" ) @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) { func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { ctx := context.Background() - bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil) loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte("")) c.Assert(err, IsNil) var regionErr *errorpb.Error @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM mCluster.Merge(s.initRegion.regionID, region2) regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID) err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch( - tikv.NewNoopBackoff(context.Background()), + retry.NewNoopBackoff(context.Background()), &tikv.RPCContext{Region: regionID, Store: &tikv.Store{}}, []*metapb.Region{regionMeta}) c.Assert(err, IsNil) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c471ccc16167a..c622e21d2ee5d 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -29,7 +29,6 @@ type BackoffType = retry.BackoffType // Back off types. const ( BoRegionMiss = retry.BoRegionMiss - BoTiKVRPC = retry.BoTiKVRPC BoTiFlashRPC = retry.BoTiFlashRPC BoTxnLockFast = retry.BoTxnLockFast BoTxnLock = retry.BoTxnLock diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 70f1cf27ccacc..e5ec039fc6911 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b break } - err2 := b.Backoff(retry.BoTiKVRPC, err1) + err2 := b.BackoffTiKVRPC(err1) // As timeout is set to math.MaxUint32, err2 should always be nil. // This line is added to make the 'make errcheck' pass. terror.Log(err2) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 9aa55baa64cfb..ec5b92f81c5ad 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -580,7 +580,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { - err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) + err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) } return errors.Trace(err) } diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index b2e0137902bce..a1a7c0619537f 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -49,7 +49,7 @@ const ( func (t BackoffType) metric() prometheus.Observer { switch t { // TODO: distinguish tikv and tiflash in metrics - case BoTiKVRPC, BoTiFlashRPC: + case boTiKVRPC, BoTiFlashRPC: return metrics.BackoffHistogramRPC case BoTxnLock: return metrics.BackoffHistogramLock @@ -121,7 +121,7 @@ type BackoffType int // Back off types. const ( - BoTiKVRPC BackoffType = iota + boTiKVRPC BackoffType = iota BoTiFlashRPC BoTxnLock BoTxnLockFast @@ -139,7 +139,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int vars.Hook(t.String(), vars) } switch t { - case BoTiKVRPC, BoTiFlashRPC: + case boTiKVRPC, BoTiFlashRPC: return NewBackoffFn(100, 2000, EqualJitter) case BoTxnLock: return NewBackoffFn(200, 3000, EqualJitter) @@ -164,7 +164,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int func (t BackoffType) String() string { switch t { - case BoTiKVRPC: + case boTiKVRPC: return "tikvRPC" case BoTiFlashRPC: return "tiflashRPC" @@ -193,7 +193,7 @@ func (t BackoffType) String() string { // TError returns pingcap/error of the backoff type. func (t BackoffType) TError() error { switch t { - case BoTiKVRPC: + case boTiKVRPC: return tikverr.ErrTiKVServerTimeout case BoTiFlashRPC: return tikverr.ErrTiFlashServerTimeout @@ -279,6 +279,11 @@ func (b *Backoffer) Backoff(typ BackoffType, err error) error { return b.BackoffWithMaxSleep(typ, -1, err) } +// BackoffTiKVRPC calls Backoff with boTiKVRPC. +func (b *Backoffer) BackoffTiKVRPC(err error) error { + return b.Backoff(boTiKVRPC, err) +} + // BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message // and never sleep more than maxSleepMs for each sleep. func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error { From 0553cdc3a8d3b3ae3383ad57df7f32696351e388 Mon Sep 17 00:00:00 2001 From: shirly Date: Tue, 11 May 2021 16:35:37 +0800 Subject: [PATCH 3/3] store/tikv: make some tiny refactor Signed-off-by: shirly --- store/tikv/2pc.go | 2 +- store/tikv/prewrite.go | 2 +- store/tikv/region_cache.go | 8 ++++---- store/tikv/retry/backoff.go | 5 ----- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 7a1e55ae3ea05..5b01c3de83a16 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -543,7 +543,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh switch act := action.(type) { case actionPrewrite: // Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest. - if !bo.HasErrors() { + if bo.ErrorsNum() == 0 { for _, group := range groups { c.regionTxnSize[group.region.id] = group.mutations.Len() } diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index ad2c3716b264a..7097ba5dbcd3e 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -151,7 +151,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". - if bo.HasErrors() { + if bo.ErrorsNum() > 0 { txnSize = math.MaxUint64 } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 2528145675e0c..f6225a2724f8e 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -42,7 +42,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/singleflight" "google.golang.org/grpc" - gbackoff "google.golang.org/grpc/backoff" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" @@ -426,7 +426,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe } if cachedRegion.checkNeedReload() { - // TODO: This may cause a fake EpochNotMatch error, and reload the region after a retry. It's better to reload + // TODO: This may cause a fake EpochNotMatch error, and reload the region after a backoff. It's better to reload // the region directly here. return nil, nil } @@ -1406,7 +1406,7 @@ func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error { - // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, retry. + // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. for _, meta := range currentRegions { if meta.GetId() == ctx.Region.id && (meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer || @@ -2111,7 +2111,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h grpc.WithInitialWindowSize(grpcInitialWindowSize), grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: gbackoff.Config{ + Backoff: backoff.Config{ BaseDelay: 100 * time.Millisecond, // Default was 1s. Multiplier: 1.6, // Default Jitter: 0.2, // Default diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index a1a7c0619537f..24dc9174f3fec 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -426,11 +426,6 @@ func (b *Backoffer) GetBackoffSleepMS() map[BackoffType]int { return b.backoffSleepMS } -// HasErrors returns true if b.errors is not empty. -func (b *Backoffer) HasErrors() bool { - return len(b.errors) != 0 -} - // ErrorsNum returns the number of errors. func (b *Backoffer) ErrorsNum() int { return len(b.errors)