Skip to content

Commit

Permalink
store/tikv: prepare for moving coprocessor out (#22732)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <[email protected]>
  • Loading branch information
disksing authored Feb 18, 2021
1 parent 9374724 commit a06c22a
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 181 deletions.
14 changes: 8 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1698,7 +1699,7 @@ func (b *batched) forgetPrimary() {
// batchExecutor is txn controller providing rate control like utils
type batchExecutor struct {
rateLim int // concurrent worker numbers
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
rateLimiter *util.RateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
backoffer *Backoffer // Backoffer
Expand All @@ -1715,19 +1716,19 @@ func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
// initUtils do initialize batchExecutor related policies like rateLimit util
func (batchExe *batchExecutor) initUtils() error {
// init rateLimiter by injected rate limit number
batchExe.rateLimiter = newRateLimit(batchExe.rateLim)
batchExe.rateLimiter = util.NewRateLimit(batchExe.rateLim)
return nil
}

// startWork concurrently do the work for each batch considering rate limit
func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchMutations) {
for idx, batch1 := range batches {
waitStart := time.Now()
if exit := batchExe.rateLimiter.getToken(exitCh); !exit {
if exit := batchExe.rateLimiter.GetToken(exitCh); !exit {
batchExe.tokenWaitDuration += time.Since(waitStart)
batch := batch1
go func() {
defer batchExe.rateLimiter.putToken()
defer batchExe.rateLimiter.PutToken()
var singleBatchBackoffer *Backoffer
if _, ok := batchExe.action.(actionCommit); ok {
// Because the secondary batches of the commit actions are implemented to be
Expand Down Expand Up @@ -1812,7 +1813,7 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {

func getTxnPriority(txn *tikvTxn) pb.CommandPri {
if pri := txn.us.GetOption(kv.Priority); pri != nil {
return kvPriorityToCommandPri(pri.(int))
return PriorityToPB(pri.(int))
}
return pb.CommandPri_Normal
}
Expand All @@ -1824,7 +1825,8 @@ func getTxnSyncLog(txn *tikvTxn) bool {
return false
}

func kvPriorityToCommandPri(pri int) pb.CommandPri {
// PriorityToPB converts priority type to wire type.
func PriorityToPB(pri int) pb.CommandPri {
switch pri {
case kv.PriorityLow:
return pb.CommandPri_Low
Expand Down
28 changes: 12 additions & 16 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -187,18 +188,13 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var
return copErrorResponse{err}
}
it := &batchCopIterator{
store: c.store,
req: req,
finishCh: make(chan struct{}),
vars: vars,
memTracker: req.MemTracker,
clientHelper: clientHelper{
LockResolver: c.store.GetLockResolver(),
RegionCache: c.store.GetRegionCache(),
Client: c.store.GetTiKVClient(),
minCommitTSPushed: &minCommitTSPushed{data: make(map[uint64]struct{}, 5)},
},
rpcCancel: NewRPCanceller(),
store: c.store,
req: req,
finishCh: make(chan struct{}),
vars: vars,
memTracker: req.MemTracker,
ClientHelper: NewClientHelper(c.store, util.NewTSSet(5)),
rpcCancel: NewRPCanceller(),
}
ctx = context.WithValue(ctx, RPCCancellerCtxKey{}, it.rpcCancel)
it.tasks = tasks
Expand All @@ -208,7 +204,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var
}

type batchCopIterator struct {
clientHelper
*ClientHelper

store *KVStore
req *kv.Request
Expand Down Expand Up @@ -328,7 +324,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer,
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.RegionCache, NewKeyRanges(ranges), b.req.StoreType)
return buildBatchCopTasks(bo, b.regionCache, NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
Expand All @@ -354,8 +350,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
}

req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{
IsolationLevel: pbIsolationLevel(b.req.IsolationLevel),
Priority: kvPriorityToCommandPri(b.req.Priority),
IsolationLevel: IsolationLevelToPB(b.req.IsolationLevel),
Priority: PriorityToPB(b.req.Priority),
NotFillCache: b.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
Expand Down
Loading

0 comments on commit a06c22a

Please sign in to comment.