From 769062b3188d5b51240c2d0cda88d3718ea82634 Mon Sep 17 00:00:00 2001 From: Shirly Date: Thu, 13 May 2021 18:35:39 +0800 Subject: [PATCH] =?UTF-8?q?store/driver:=20move=20backoff=20driver=20into?= =?UTF-8?q?=20single=20package=20so=20we=20can=20use=20i=E2=80=A6=20(#2462?= =?UTF-8?q?4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/copr/batch_coprocessor.go | 19 ++++--- store/copr/batch_request_sender.go | 4 +- store/copr/coprocessor.go | 23 ++++---- store/copr/coprocessor_test.go | 5 +- store/copr/mpp.go | 13 +++-- store/copr/store.go | 62 +------------------- store/driver/backoff/backoff.go | 90 ++++++++++++++++++++++++++++++ store/tikv/retry/backoff.go | 3 +- 8 files changed, 129 insertions(+), 90 deletions(-) create mode 100644 store/driver/backoff/backoff.go diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index c070f25a454da..b0c0ad5c9ea7b 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" @@ -97,7 +98,7 @@ type copTaskAndRPCContext struct { ctx *tikv.RPCContext } -func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) { +func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) { start := time.Now() const cmdType = tikvrpc.CmdBatchCop rangesLen := ranges.Len() @@ -178,7 +179,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")} } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) - bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) + bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType) if err != nil { @@ -223,7 +224,7 @@ func (b *batchCopIterator) run(ctx context.Context) { // We run workers for every batch cop. for _, task := range b.tasks { b.wg.Add(1) - bo := newBackofferWithVars(ctx, copNextMaxBackoff, b.vars) + bo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, b.vars) go b.handleTask(ctx, bo, task) } b.wg.Wait() @@ -293,7 +294,7 @@ func (b *batchCopIterator) Close() error { return nil } -func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *batchCopTask) { +func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *batchCopTask) { tasks := []*batchCopTask{task} for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) @@ -308,7 +309,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task * } // Merge all ranges and request again. -func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { +func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { var ranges []tikvstore.KeyRange for _, taskCtx := range batchTask.copTasks { taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) { @@ -318,7 +319,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType) } -func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, task *batchCopTask) ([]*batchCopTask, error) { +func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) { sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks)) for _, task := range task.copTasks { @@ -363,7 +364,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta return nil, b.handleStreamedBatchCopResponse(ctx, bo, resp.Resp.(*tikvrpc.BatchCopStreamResponse), task) } -func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) { +func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *Backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) { defer response.Close() resp := response.BatchResponse if resp == nil { @@ -381,7 +382,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b return nil } - if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { + if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { return errors.Trace(err) } @@ -396,7 +397,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b } } -func (b *batchCopIterator) handleBatchCopResponse(bo *backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) { +func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) { if otherErr := response.GetOtherError(); otherErr != "" { err = errors.Errorf("other error: %s", otherErr) logutil.BgLogger().Warn("other error", diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 139ee087ec290..422306382337d 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -38,7 +38,7 @@ func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *R } } -func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { +func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { // use the first ctx to send request, because every ctx has same address. cancel = func() {} rpcCtx := ctxs[0].ctx @@ -67,7 +67,7 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []co return } -func (ss *RegionBatchRequestSender) onSendFail(bo *backoffer, ctxs []copTaskAndRPCContext, err error) error { +func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { return errors.Trace(err) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index dd8474fd75c3a..e9d9e6b8f1ebb 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" tidbmetrics "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -73,7 +74,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa return c.sendBatch(ctx, req, vars) } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) - bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) + bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := toTiKVKeyRanges(req.KeyRanges) tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req) if err != nil { @@ -144,7 +145,7 @@ func (r *copTask) String() string { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) { +func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) { start := time.Now() cmdType := tikvrpc.CmdCop if req.Streaming { @@ -605,12 +606,12 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { // Associate each region with an independent backoffer. In this way, when multiple regions are // unavailable, TiDB can execute very quickly without blocking -func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*backoffer, task *copTask, worker *copIteratorWorker) *backoffer { +func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer { bo, ok := backoffermap[task.region.GetID()] if ok { return bo } - newbo := newBackofferWithVars(ctx, copNextMaxBackoff, worker.vars) + newbo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, worker.vars) backoffermap[task.region.GetID()] = newbo return newbo } @@ -629,7 +630,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask, } }() remainTasks := []*copTask{task} - backoffermap := make(map[uint64]*backoffer) + backoffermap := make(map[uint64]*Backoffer) for len(remainTasks) > 0 { curTask := remainTasks[0] bo := chooseBackoffer(ctx, backoffermap, curTask, worker) @@ -657,7 +658,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask, // handleTaskOnce handles single copTask, successful results are send to channel. // If error happened, returns error. If region split or meet lock, returns the remain tasks. -func (worker *copIteratorWorker) handleTaskOnce(bo *backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { failpoint.Inject("handleTaskOnceError", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil, errors.New("mock handleTaskOnce error")) @@ -747,7 +748,7 @@ const ( minLogKVProcessTime = 100 ) -func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *backoffer, resp *tikvrpc.Response) { +func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) { logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr) if bo.GetTotalSleep() > minLogBackoffTime { backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.TiKVBackoffer().GetTypes()), " ", ",", -1) @@ -809,7 +810,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { defer stream.Close() var resp *coprocessor.Response var lastRange *coprocessor.KeyRange @@ -833,7 +834,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti if task.storeType == kv.TiFlash { err1 = bo.Backoff(tikv.BoTiFlashRPC, err1) } else { - err1 = bo.b.BackoffTiKVRPC(err1) + err1 = bo.BackoffTiKVRPC(err1) } if err1 != nil { @@ -858,7 +859,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti // returns more tasks when that happens, or handles the response if no error. // if we're handling streaming coprocessor response, lastRange is the range of last // successful response, otherwise it's nil. -func (worker *copIteratorWorker) handleCopResponse(bo *backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { if regionErr := resp.pbResp.GetRegionError(); regionErr != nil { if rpcCtx != nil && task.storeType == kv.TiDB { resp.err = errors.Errorf("error: %v", regionErr) @@ -1015,7 +1016,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, return nil } -func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) { +func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) { remainedRanges := task.ranges if worker.req.Streaming && lastRange != nil { remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index d7a6d52c5b4bb..3bd34f05d95f9 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" @@ -43,7 +44,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { cache := tikv.NewRegionCache(pdCli) defer cache.Close() - bo := newBackofferWithVars(context.Background(), 3000, nil) + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} flashReq := &kv.Request{} @@ -212,7 +213,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { pdCli := &tikv.CodecPDClient{Client: mocktikv.NewPDClient(cluster)} cache := tikv.NewRegionCache(pdCli) defer cache.Close() - bo := newBackofferWithVars(context.Background(), 3000, nil) + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 0d156de69fb20..9869fa501d430 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -56,7 +57,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) - bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) + bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } @@ -152,7 +153,7 @@ func (m *mppIterator) run(ctx context.Context) { break } m.wg.Add(1) - bo := newBackoffer(ctx, copNextMaxBackoff) + bo := backoff.NewBackoffer(ctx, copNextMaxBackoff) go m.handleDispatchReq(ctx, bo, task) } m.wg.Wait() @@ -176,7 +177,7 @@ func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) { // TODO:: Consider that which way is better: // - dispatch all tasks at once, and connect tasks at second. // - dispatch tasks and establish connection at the same time. -func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req *kv.MPPDispatchRequest) { +func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req *kv.MPPDispatchRequest) { defer func() { m.wg.Done() }() @@ -299,7 +300,7 @@ func (m *mppIterator) cancelMppTasks() { } } -func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { +func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { connReq := &mpp.EstablishMPPConnectionRequest{ SenderMeta: taskMeta, ReceiverMeta: &mpp.TaskMeta{ @@ -343,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques return } - if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil { + if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil { if errors.Cause(err) == context.Canceled { logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) } else { @@ -366,7 +367,7 @@ func (m *mppIterator) Close() error { return nil } -func (m *mppIterator) handleMPPStreamResponse(bo *backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) { +func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) { if response.Error != nil { err = errors.Errorf("other error for mpp stream: %s", response.Error.Msg) logutil.BgLogger().Warn("other error", diff --git a/store/copr/store.go b/store/copr/store.go index d3f132f85238f..7fa4aeafb5135 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -122,62 +123,5 @@ func getEndPointType(t kv.StoreType) tikvrpc.EndpointType { } } -// backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error. -type backoffer struct { - b *tikv.Backoffer -} - -// newBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables. -func newBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *backoffer { - b := tikv.NewBackofferWithVars(ctx, maxSleep, vars) - return &backoffer{b: b} -} - -func newBackoffer(ctx context.Context, maxSleep int) *backoffer { - b := tikv.NewBackoffer(ctx, maxSleep) - return &backoffer{b: b} -} - -// TiKVBackoffer returns tikv.Backoffer. -func (b *backoffer) TiKVBackoffer() *tikv.Backoffer { - return b.b -} - -// Backoff sleeps a while base on the backoffType and records the error message. -// It returns a retryable error if total sleep time exceeds maxSleep. -func (b *backoffer) Backoff(typ tikv.BackoffType, err error) error { - e := b.b.Backoff(typ, err) - return derr.ToTiDBErr(e) -} - -// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message -// and never sleep more than maxSleepMs for each sleep. -func (b *backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error { - e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err) - return derr.ToTiDBErr(e) -} - -// GetBackoffTimes returns a map contains backoff time count by type. -func (b *backoffer) GetBackoffTimes() map[tikv.BackoffType]int { - return b.b.GetBackoffTimes() -} - -// GetCtx returns the binded context. -func (b *backoffer) GetCtx() context.Context { - return b.b.GetCtx() -} - -// GetVars returns the binded vars. -func (b *backoffer) GetVars() *tikv.Variables { - return b.b.GetVars() -} - -// GetBackoffSleepMS returns a map contains backoff sleep time by type. -func (b *backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int { - return b.b.GetBackoffSleepMS() -} - -// GetTotalSleep returns total sleep time. -func (b *backoffer) GetTotalSleep() int { - return b.b.GetTotalSleep() -} +// Backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error. +type Backoffer = backoff.Backoffer diff --git a/store/driver/backoff/backoff.go b/store/driver/backoff/backoff.go new file mode 100644 index 0000000000000..f634366381d06 --- /dev/null +++ b/store/driver/backoff/backoff.go @@ -0,0 +1,90 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package backoff + +import ( + "context" + + "github.com/pingcap/tidb/kv" + derr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/store/tikv" +) + +// Backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error. +type Backoffer struct { + b *tikv.Backoffer +} + +// NewBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables. +func NewBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *Backoffer { + b := tikv.NewBackofferWithVars(ctx, maxSleep, vars) + return &Backoffer{b: b} +} + +// NewBackoffer creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + b := tikv.NewBackoffer(ctx, maxSleep) + return &Backoffer{b: b} +} + +// TiKVBackoffer returns tikv.Backoffer. +func (b *Backoffer) TiKVBackoffer() *tikv.Backoffer { + return b.b +} + +// Backoff sleeps a while base on the backoffType and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(typ tikv.BackoffType, err error) error { + e := b.b.Backoff(typ, err) + return derr.ToTiDBErr(e) +} + +// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) BackoffTiKVRPC(err error) error { + e := b.b.BackoffTiKVRPC(err) + return derr.ToTiDBErr(e) +} + +// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error { + e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err) + return derr.ToTiDBErr(e) +} + +// GetBackoffTimes returns a map contains backoff time count by type. +func (b *Backoffer) GetBackoffTimes() map[tikv.BackoffType]int { + return b.b.GetBackoffTimes() +} + +// GetCtx returns the binded context. +func (b *Backoffer) GetCtx() context.Context { + return b.b.GetCtx() +} + +// GetVars returns the binded vars. +func (b *Backoffer) GetVars() *tikv.Variables { + return b.b.GetVars() +} + +// GetBackoffSleepMS returns a map contains backoff sleep time by type. +func (b *Backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int { + return b.b.GetBackoffSleepMS() +} + +// GetTotalSleep returns total sleep time. +func (b *Backoffer) GetTotalSleep() int { + return b.b.GetTotalSleep() +} diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index 24dc9174f3fec..a563ec7359d22 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -279,7 +279,8 @@ func (b *Backoffer) Backoff(typ BackoffType, err error) error { return b.BackoffWithMaxSleep(typ, -1, err) } -// BackoffTiKVRPC calls Backoff with boTiKVRPC. +// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. func (b *Backoffer) BackoffTiKVRPC(err error) error { return b.Backoff(boTiKVRPC, err) }