Skip to content

Commit

Permalink
store/driver: move backoff driver into single package so we can use i…
Browse files Browse the repository at this point in the history
…t in tidb

Signed-off-by: shirly <[email protected]>
  • Loading branch information
AndreMouche committed May 13, 2021
1 parent 7c8ddd8 commit e7d024a
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 90 deletions.
19 changes: 10 additions & 9 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
Expand Down Expand Up @@ -97,7 +98,7 @@ type copTaskAndRPCContext struct {
ctx *tikv.RPCContext
}

func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
Expand Down Expand Up @@ -178,7 +179,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
if err != nil {
Expand Down Expand Up @@ -223,7 +224,7 @@ func (b *batchCopIterator) run(ctx context.Context) {
// We run workers for every batch cop.
for _, task := range b.tasks {
b.wg.Add(1)
bo := newBackofferWithVars(ctx, copNextMaxBackoff, b.vars)
bo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, b.vars)
go b.handleTask(ctx, bo, task)
}
b.wg.Wait()
Expand Down Expand Up @@ -293,7 +294,7 @@ func (b *batchCopIterator) Close() error {
return nil
}

func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *batchCopTask) {
func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *batchCopTask) {
tasks := []*batchCopTask{task}
for idx := 0; idx < len(tasks); idx++ {
ret, err := b.handleTaskOnce(ctx, bo, tasks[idx])
Expand All @@ -308,7 +309,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *
}

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
Expand All @@ -318,7 +319,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer,
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, task *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
Expand Down Expand Up @@ -363,7 +364,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta
return nil, b.handleStreamedBatchCopResponse(ctx, bo, resp.Resp.(*tikvrpc.BatchCopStreamResponse), task)
}

func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) {
func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *Backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) {
defer response.Close()
resp := response.BatchResponse
if resp == nil {
Expand All @@ -381,7 +382,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand All @@ -396,7 +397,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
}
}

func (b *batchCopIterator) handleBatchCopResponse(bo *backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) {
func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) {
if otherErr := response.GetOtherError(); otherErr != "" {
err = errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *R
}
}

func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
// use the first ctx to send request, because every ctx has same address.
cancel = func() {}
rpcCtx := ctxs[0].ctx
Expand Down Expand Up @@ -67,7 +67,7 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []co
return
}

func (ss *RegionBatchRequestSender) onSendFail(bo *backoffer, ctxs []copTaskAndRPCContext, err error) error {
func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
Expand Down
23 changes: 12 additions & 11 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
if err != nil {
Expand Down Expand Up @@ -144,7 +145,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand Down Expand Up @@ -605,12 +606,12 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*backoffer, task *copTask, worker *copIteratorWorker) *backoffer {
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
bo, ok := backoffermap[task.region.GetID()]
if ok {
return bo
}
newbo := newBackofferWithVars(ctx, copNextMaxBackoff, worker.vars)
newbo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, worker.vars)
backoffermap[task.region.GetID()] = newbo
return newbo
}
Expand All @@ -629,7 +630,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,
}
}()
remainTasks := []*copTask{task}
backoffermap := make(map[uint64]*backoffer)
backoffermap := make(map[uint64]*Backoffer)
for len(remainTasks) > 0 {
curTask := remainTasks[0]
bo := chooseBackoffer(ctx, backoffermap, curTask, worker)
Expand Down Expand Up @@ -657,7 +658,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,

// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (worker *copIteratorWorker) handleTaskOnce(bo *backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
failpoint.Inject("handleTaskOnceError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock handleTaskOnce error"))
Expand Down Expand Up @@ -747,7 +748,7 @@ const (
minLogKVProcessTime = 100
)

func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *backoffer, resp *tikvrpc.Response) {
func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) {
logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr)
if bo.GetTotalSleep() > minLogBackoffTime {
backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.TiKVBackoffer().GetTypes()), " ", ",", -1)
Expand Down Expand Up @@ -809,7 +810,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -833,7 +834,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
if task.storeType == kv.TiFlash {
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
err1 = bo.BackoffTiKVRPC(err1)
}

if err1 != nil {
Expand All @@ -858,7 +859,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if rpcCtx != nil && task.storeType == kv.TiDB {
resp.err = errors.Errorf("error: %v", regionErr)
Expand Down Expand Up @@ -1015,7 +1016,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask,
return nil
}

func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
remainedRanges := task.ranges
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
Expand Down
5 changes: 3 additions & 2 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
Expand All @@ -43,7 +44,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
cache := tikv.NewRegionCache(pdCli)
defer cache.Close()

bo := newBackofferWithVars(context.Background(), 3000, nil)
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
flashReq := &kv.Request{}
Expand Down Expand Up @@ -212,7 +213,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
pdCli := &tikv.CodecPDClient{Client: mocktikv.NewPDClient(cluster)}
cache := tikv.NewRegionCache(pdCli)
defer cache.Close()
bo := newBackofferWithVars(context.Background(), 3000, nil)
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
Expand Down
13 changes: 7 additions & 6 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func (m *mppIterator) run(ctx context.Context) {
break
}
m.wg.Add(1)
bo := newBackoffer(ctx, copNextMaxBackoff)
bo := backoff.NewBackoffer(ctx, copNextMaxBackoff)
go m.handleDispatchReq(ctx, bo, task)
}
m.wg.Wait()
Expand All @@ -176,7 +177,7 @@ func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) {
// TODO:: Consider that which way is better:
// - dispatch all tasks at once, and connect tasks at second.
// - dispatch tasks and establish connection at the same time.
func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req *kv.MPPDispatchRequest) {
func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req *kv.MPPDispatchRequest) {
defer func() {
m.wg.Done()
}()
Expand Down Expand Up @@ -299,7 +300,7 @@ func (m *mppIterator) cancelMppTasks() {
}
}

func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
connReq := &mpp.EstablishMPPConnectionRequest{
SenderMeta: taskMeta,
ReceiverMeta: &mpp.TaskMeta{
Expand Down Expand Up @@ -343,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand All @@ -366,7 +367,7 @@ func (m *mppIterator) Close() error {
return nil
}

func (m *mppIterator) handleMPPStreamResponse(bo *backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
if response.Error != nil {
err = errors.Errorf("other error for mpp stream: %s", response.Error.Msg)
logutil.BgLogger().Warn("other error",
Expand Down
62 changes: 3 additions & 59 deletions store/copr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -122,62 +123,5 @@ func getEndPointType(t kv.StoreType) tikvrpc.EndpointType {
}
}

// backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error.
type backoffer struct {
b *tikv.Backoffer
}

// newBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables.
func newBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *backoffer {
b := tikv.NewBackofferWithVars(ctx, maxSleep, vars)
return &backoffer{b: b}
}

func newBackoffer(ctx context.Context, maxSleep int) *backoffer {
b := tikv.NewBackoffer(ctx, maxSleep)
return &backoffer{b: b}
}

// TiKVBackoffer returns tikv.Backoffer.
func (b *backoffer) TiKVBackoffer() *tikv.Backoffer {
return b.b
}

// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *backoffer) Backoff(typ tikv.BackoffType, err error) error {
e := b.b.Backoff(typ, err)
return derr.ToTiDBErr(e)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err)
return derr.ToTiDBErr(e)
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *backoffer) GetBackoffTimes() map[tikv.BackoffType]int {
return b.b.GetBackoffTimes()
}

// GetCtx returns the binded context.
func (b *backoffer) GetCtx() context.Context {
return b.b.GetCtx()
}

// GetVars returns the binded vars.
func (b *backoffer) GetVars() *tikv.Variables {
return b.b.GetVars()
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int {
return b.b.GetBackoffSleepMS()
}

// GetTotalSleep returns total sleep time.
func (b *backoffer) GetTotalSleep() int {
return b.b.GetTotalSleep()
}
// Backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error.
type Backoffer = backoff.Backoffer
Loading

0 comments on commit e7d024a

Please sign in to comment.