Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/driver: move backoff driver into single package so we can use i… #24624

Merged
merged 5 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
Expand Down Expand Up @@ -97,7 +98,7 @@ type copTaskAndRPCContext struct {
ctx *tikv.RPCContext
}

func buildBatchCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
Expand Down Expand Up @@ -178,7 +179,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
if err != nil {
Expand Down Expand Up @@ -223,7 +224,7 @@ func (b *batchCopIterator) run(ctx context.Context) {
// We run workers for every batch cop.
for _, task := range b.tasks {
b.wg.Add(1)
bo := newBackofferWithVars(ctx, copNextMaxBackoff, b.vars)
bo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, b.vars)
go b.handleTask(ctx, bo, task)
}
b.wg.Wait()
Expand Down Expand Up @@ -293,7 +294,7 @@ func (b *batchCopIterator) Close() error {
return nil
}

func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *batchCopTask) {
func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *batchCopTask) {
tasks := []*batchCopTask{task}
for idx := 0; idx < len(tasks); idx++ {
ret, err := b.handleTaskOnce(ctx, bo, tasks[idx])
Expand All @@ -308,7 +309,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *backoffer, task *
}

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) {
Expand All @@ -318,7 +319,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoffer,
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, task *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks))
for _, task := range task.copTasks {
Expand Down Expand Up @@ -363,7 +364,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoffer, ta
return nil, b.handleStreamedBatchCopResponse(ctx, bo, resp.Resp.(*tikvrpc.BatchCopStreamResponse), task)
}

func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) {
func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, bo *Backoffer, response *tikvrpc.BatchCopStreamResponse, task *batchCopTask) (err error) {
defer response.Close()
resp := response.BatchResponse
if resp == nil {
Expand All @@ -381,7 +382,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand All @@ -396,7 +397,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
}
}

func (b *batchCopIterator) handleBatchCopResponse(bo *backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) {
func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *coprocessor.BatchResponse, task *batchCopTask) (err error) {
if otherErr := response.GetOtherError(); otherErr != "" {
err = errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *R
}
}

func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
// use the first ctx to send request, because every ctx has same address.
cancel = func() {}
rpcCtx := ctxs[0].ctx
Expand Down Expand Up @@ -67,7 +67,7 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *backoffer, ctxs []co
return
}

func (ss *RegionBatchRequestSender) onSendFail(bo *backoffer, ctxs []copTaskAndRPCContext, err error) error {
func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
Expand Down
23 changes: 12 additions & 11 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
if err != nil {
Expand Down Expand Up @@ -144,7 +145,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand Down Expand Up @@ -605,12 +606,12 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*backoffer, task *copTask, worker *copIteratorWorker) *backoffer {
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
bo, ok := backoffermap[task.region.GetID()]
if ok {
return bo
}
newbo := newBackofferWithVars(ctx, copNextMaxBackoff, worker.vars)
newbo := backoff.NewBackofferWithVars(ctx, copNextMaxBackoff, worker.vars)
backoffermap[task.region.GetID()] = newbo
return newbo
}
Expand All @@ -629,7 +630,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,
}
}()
remainTasks := []*copTask{task}
backoffermap := make(map[uint64]*backoffer)
backoffermap := make(map[uint64]*Backoffer)
for len(remainTasks) > 0 {
curTask := remainTasks[0]
bo := chooseBackoffer(ctx, backoffermap, curTask, worker)
Expand Down Expand Up @@ -657,7 +658,7 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,

// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (worker *copIteratorWorker) handleTaskOnce(bo *backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
failpoint.Inject("handleTaskOnceError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock handleTaskOnce error"))
Expand Down Expand Up @@ -747,7 +748,7 @@ const (
minLogKVProcessTime = 100
)

func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *backoffer, resp *tikvrpc.Response) {
func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) {
logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr)
if bo.GetTotalSleep() > minLogBackoffTime {
backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.TiKVBackoffer().GetTypes()), " ", ",", -1)
Expand Down Expand Up @@ -809,7 +810,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -833,7 +834,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
if task.storeType == kv.TiFlash {
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
err1 = bo.BackoffTiKVRPC(err1)
}

if err1 != nil {
Expand All @@ -858,7 +859,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if rpcCtx != nil && task.storeType == kv.TiDB {
resp.err = errors.Errorf("error: %v", regionErr)
Expand Down Expand Up @@ -1015,7 +1016,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask,
return nil
}

func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
remainedRanges := task.ranges
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
Expand Down
5 changes: 3 additions & 2 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
Expand All @@ -43,7 +44,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
cache := tikv.NewRegionCache(pdCli)
defer cache.Close()

bo := newBackofferWithVars(context.Background(), 3000, nil)
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
flashReq := &kv.Request{}
Expand Down Expand Up @@ -212,7 +213,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
pdCli := &tikv.CodecPDClient{Client: mocktikv.NewPDClient(cluster)}
cache := tikv.NewRegionCache(pdCli)
defer cache.Close()
bo := newBackofferWithVars(context.Background(), 3000, nil)
bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
Expand Down
13 changes: 7 additions & 6 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func (m *mppIterator) run(ctx context.Context) {
break
}
m.wg.Add(1)
bo := newBackoffer(ctx, copNextMaxBackoff)
bo := backoff.NewBackoffer(ctx, copNextMaxBackoff)
go m.handleDispatchReq(ctx, bo, task)
}
m.wg.Wait()
Expand All @@ -176,7 +177,7 @@ func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) {
// TODO:: Consider that which way is better:
// - dispatch all tasks at once, and connect tasks at second.
// - dispatch tasks and establish connection at the same time.
func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req *kv.MPPDispatchRequest) {
func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req *kv.MPPDispatchRequest) {
defer func() {
m.wg.Done()
}()
Expand Down Expand Up @@ -299,7 +300,7 @@ func (m *mppIterator) cancelMppTasks() {
}
}

func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
connReq := &mpp.EstablishMPPConnectionRequest{
SenderMeta: taskMeta,
ReceiverMeta: &mpp.TaskMeta{
Expand Down Expand Up @@ -343,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand All @@ -366,7 +367,7 @@ func (m *mppIterator) Close() error {
return nil
}

func (m *mppIterator) handleMPPStreamResponse(bo *backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
if response.Error != nil {
err = errors.Errorf("other error for mpp stream: %s", response.Error.Msg)
logutil.BgLogger().Warn("other error",
Expand Down
62 changes: 3 additions & 59 deletions store/copr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -122,62 +123,5 @@ func getEndPointType(t kv.StoreType) tikvrpc.EndpointType {
}
}

// backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error.
type backoffer struct {
b *tikv.Backoffer
}

// newBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables.
func newBackofferWithVars(ctx context.Context, maxSleep int, vars *kv.Variables) *backoffer {
b := tikv.NewBackofferWithVars(ctx, maxSleep, vars)
return &backoffer{b: b}
}

func newBackoffer(ctx context.Context, maxSleep int) *backoffer {
b := tikv.NewBackoffer(ctx, maxSleep)
return &backoffer{b: b}
}

// TiKVBackoffer returns tikv.Backoffer.
func (b *backoffer) TiKVBackoffer() *tikv.Backoffer {
return b.b
}

// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *backoffer) Backoff(typ tikv.BackoffType, err error) error {
e := b.b.Backoff(typ, err)
return derr.ToTiDBErr(e)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err)
return derr.ToTiDBErr(e)
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *backoffer) GetBackoffTimes() map[tikv.BackoffType]int {
return b.b.GetBackoffTimes()
}

// GetCtx returns the binded context.
func (b *backoffer) GetCtx() context.Context {
return b.b.GetCtx()
}

// GetVars returns the binded vars.
func (b *backoffer) GetVars() *tikv.Variables {
return b.b.GetVars()
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int {
return b.b.GetBackoffSleepMS()
}

// GetTotalSleep returns total sleep time.
func (b *backoffer) GetTotalSleep() int {
return b.b.GetTotalSleep()
}
// Backoffer wraps tikv.Backoffer and converts the error which returns by the functions of tikv.Backoffer to tidb error.
type Backoffer = backoff.Backoffer
Loading