diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9be5ca65250cf..c4aeb38719923 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -1698,7 +1699,7 @@ func (b *batched) forgetPrimary() { // batchExecutor is txn controller providing rate control like utils type batchExecutor struct { rateLim int // concurrent worker numbers - rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies + rateLimiter *util.RateLimit // rate limiter for concurrency control, maybe more strategies committer *twoPhaseCommitter // here maybe more different type committer in the future action twoPhaseCommitAction // the work action type backoffer *Backoffer // Backoffer @@ -1715,7 +1716,7 @@ func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter, // initUtils do initialize batchExecutor related policies like rateLimit util func (batchExe *batchExecutor) initUtils() error { // init rateLimiter by injected rate limit number - batchExe.rateLimiter = newRateLimit(batchExe.rateLim) + batchExe.rateLimiter = util.NewRateLimit(batchExe.rateLim) return nil } @@ -1723,11 +1724,11 @@ func (batchExe *batchExecutor) initUtils() error { func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error, batches []batchMutations) { for idx, batch1 := range batches { waitStart := time.Now() - if exit := batchExe.rateLimiter.getToken(exitCh); !exit { + if exit := batchExe.rateLimiter.GetToken(exitCh); !exit { batchExe.tokenWaitDuration += time.Since(waitStart) batch := batch1 go func() { - defer batchExe.rateLimiter.putToken() + defer batchExe.rateLimiter.PutToken() var singleBatchBackoffer *Backoffer if _, ok := batchExe.action.(actionCommit); ok { // Because the secondary batches of the commit actions are implemented to be @@ -1812,7 +1813,7 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { func getTxnPriority(txn *tikvTxn) pb.CommandPri { if pri := txn.us.GetOption(kv.Priority); pri != nil { - return kvPriorityToCommandPri(pri.(int)) + return PriorityToPB(pri.(int)) } return pb.CommandPri_Normal } @@ -1824,7 +1825,8 @@ func getTxnSyncLog(txn *tikvTxn) bool { return false } -func kvPriorityToCommandPri(pri int) pb.CommandPri { +// PriorityToPB converts priority type to wire type. +func PriorityToPB(pri int) pb.CommandPri { switch pri { case kv.PriorityLow: return pb.CommandPri_Low diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index ba597fa346981..4a49c82cfe575 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) @@ -187,18 +188,13 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var return copErrorResponse{err} } it := &batchCopIterator{ - store: c.store, - req: req, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, - clientHelper: clientHelper{ - LockResolver: c.store.GetLockResolver(), - RegionCache: c.store.GetRegionCache(), - Client: c.store.GetTiKVClient(), - minCommitTSPushed: &minCommitTSPushed{data: make(map[uint64]struct{}, 5)}, - }, - rpcCancel: NewRPCanceller(), + store: c.store, + req: req, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + ClientHelper: NewClientHelper(c.store, util.NewTSSet(5)), + rpcCancel: NewRPCanceller(), } ctx = context.WithValue(ctx, RPCCancellerCtxKey{}, it.rpcCancel) it.tasks = tasks @@ -208,7 +204,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var } type batchCopIterator struct { - clientHelper + *ClientHelper store *KVStore req *kv.Request @@ -328,7 +324,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, ranges = append(ranges, *ran) }) } - return buildBatchCopTasks(bo, b.RegionCache, NewKeyRanges(ranges), b.req.StoreType) + return buildBatchCopTasks(bo, b.regionCache, NewKeyRanges(ranges), b.req.StoreType) } func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) { @@ -354,8 +350,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta } req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ - IsolationLevel: pbIsolationLevel(b.req.IsolationLevel), - Priority: kvPriorityToCommandPri(b.req.Priority), + IsolationLevel: IsolationLevelToPB(b.req.IsolationLevel), + Priority: PriorityToPB(b.req.Priority), NotFillCache: b.req.NotFillCache, RecordTimeStat: true, RecordScanStat: true, diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index e8d9fa71cc6d9..e00890f7e634a 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" @@ -74,8 +75,8 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable memTracker: req.MemTracker, replicaReadSeed: c.replicaReadSeed, rpcCancel: NewRPCanceller(), + resolvedLocks: util.NewTSSet(5), } - it.minCommitTSPushed.data = make(map[uint64]struct{}, 5) it.tasks = tasks if it.concurrency > len(tasks) { it.concurrency = len(tasks) @@ -86,7 +87,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable } if it.req.KeepOrder { - it.sendRate = newRateLimit(2 * it.concurrency) + it.sendRate = util.NewRateLimit(2 * it.concurrency) it.respChan = nil } else { capacity := it.concurrency @@ -97,9 +98,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable capacity = it.concurrency * 2 } it.respChan = make(chan *copResponse, capacity) - it.sendRate = newRateLimit(it.concurrency) + it.sendRate = util.NewRateLimit(it.concurrency) } - it.actionOnExceed = newRateLimitAction(uint(cap(it.sendRate.token))) + it.actionOnExceed = newRateLimitAction(uint(it.sendRate.GetCapacity())) if sessionMemTracker != nil { sessionMemTracker.FallbackOldAndSetNewAction(it.actionOnExceed) } @@ -226,7 +227,7 @@ type copIterator struct { curr int // sendRate controls the sending rate of copIteratorTaskSender - sendRate *rateLimit + sendRate *util.RateLimit // Otherwise, results are stored in respChan. respChan chan *copResponse @@ -245,7 +246,7 @@ type copIterator struct { // when the Close is called. we use atomic.CompareAndSwap `closed` to to make sure the channel is not closed twice. closed uint32 - minCommitTSPushed + resolvedLocks *util.TSSet actionOnExceed *rateLimitAction } @@ -259,7 +260,7 @@ type copIteratorWorker struct { respChan chan<- *copResponse finishCh <-chan struct{} vars *kv.Variables - clientHelper + *ClientHelper memTracker *memory.Tracker @@ -275,7 +276,7 @@ type copIteratorTaskSender struct { tasks []*copTask finishCh <-chan struct{} respChan chan<- *copResponse - sendRate *rateLimit + sendRate *util.RateLimit } type copResponse struct { @@ -381,19 +382,14 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction bool) { // Start it.concurrency number of workers to handle cop requests. for i := 0; i < it.concurrency; i++ { worker := &copIteratorWorker{ - taskCh: taskCh, - wg: &it.wg, - store: it.store, - req: it.req, - respChan: it.respChan, - finishCh: it.finishCh, - vars: it.vars, - clientHelper: clientHelper{ - LockResolver: it.store.GetLockResolver(), - RegionCache: it.store.GetRegionCache(), - minCommitTSPushed: &it.minCommitTSPushed, - Client: it.store.GetTiKVClient(), - }, + taskCh: taskCh, + wg: &it.wg, + store: it.store, + req: it.req, + respChan: it.respChan, + finishCh: it.finishCh, + vars: it.vars, + ClientHelper: NewClientHelper(it.store, it.resolvedLocks), memTracker: it.memTracker, replicaReadSeed: it.replicaReadSeed, actionOnExceed: it.actionOnExceed, @@ -426,7 +422,7 @@ func (sender *copIteratorTaskSender) run() { // We keep the number of inflight tasks within the number of 2 * concurrency when Keep Order is true. // If KeepOrder is false, the number equals the concurrency. // It sends one more task if a task has been finished in copIterator.Next. - exit := sender.sendRate.getToken(sender.finishCh) + exit := sender.sendRate.GetToken(sender.finishCh) if exit { break } @@ -554,7 +550,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } if resp == finCopResp { it.actionOnExceed.destroyTokenIfNeeded(func() { - it.sendRate.putToken() + it.sendRate.PutToken() }) return it.Next(ctx) } @@ -575,7 +571,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { break } it.actionOnExceed.destroyTokenIfNeeded(func() { - it.sendRate.putToken() + it.sendRate.PutToken() }) // Switch to next task. it.tasks[it.curr] = nil @@ -674,7 +670,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch cacheKey = cKey cValue := worker.store.coprCache.Get(cKey) copReq.IsCacheEnabled = true - if cValue != nil && cValue.RegionID == task.region.id && cValue.TimeStamp <= worker.req.StartTs { + if cValue != nil && cValue.RegionID == task.region.GetID() && cValue.TimeStamp <= worker.req.StartTs { // Append cache version to the request to skip Coprocessor computation if possible // when request result is cached copReq.CacheIfMatchVersion = cValue.RegionDataVersion @@ -688,8 +684,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch } req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, &worker.replicaReadSeed, kvrpcpb.Context{ - IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel), - Priority: kvPriorityToCommandPri(worker.req.Priority), + IsolationLevel: IsolationLevelToPB(worker.req.IsolationLevel), + Priority: PriorityToPB(worker.req.Priority), NotFillCache: worker.req.NotFillCache, RecordTimeStat: true, RecordScanStat: true, @@ -725,51 +721,34 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime) } -type minCommitTSPushed struct { - data map[uint64]struct{} - sync.RWMutex -} - -func (m *minCommitTSPushed) Update(from []uint64) { - m.Lock() - for _, v := range from { - m.data[v] = struct{}{} - } - m.Unlock() -} - -func (m *minCommitTSPushed) Get() []uint64 { - m.RLock() - defer m.RUnlock() - if len(m.data) == 0 { - return nil - } - - ret := make([]uint64, 0, len(m.data)) - for k := range m.data { - ret = append(ret, k) - } - return ret -} - -// clientHelper wraps LockResolver and RegionRequestSender. +// ClientHelper wraps LockResolver and RegionRequestSender. // It's introduced to support the new lock resolving pattern in the large transaction. // In the large transaction protocol, sending requests and resolving locks are // context-dependent. For example, when a send request meets a secondary lock, we'll // call ResolveLock, and if the lock belongs to a large transaction, we may retry // the request. If there is no context information about the resolved locks, we'll // meet the secondary lock again and run into a deadloop. -type clientHelper struct { - *LockResolver - *RegionCache - *minCommitTSPushed - Client - resolveLite bool +type ClientHelper struct { + lockResolver *LockResolver + regionCache *RegionCache + resolvedLocks *util.TSSet + client Client + resolveLite bool RegionRequestRuntimeStats } +// NewClientHelper creates a helper instance. +func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet) *ClientHelper { + return &ClientHelper{ + lockResolver: store.GetLockResolver(), + regionCache: store.GetRegionCache(), + resolvedLocks: resolvedLocks, + client: store.GetTiKVClient(), + } +} + // ResolveLocks wraps the ResolveLocks function and store the resolved result. -func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { +func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 @@ -779,28 +758,28 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks }(time.Now()) } if ch.resolveLite { - msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.ResolveLocksLite(bo, callerStartTS, locks) + msBeforeTxnExpired, resolvedLocks, err = ch.lockResolver.ResolveLocksLite(bo, callerStartTS, locks) } else { - msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.ResolveLocks(bo, callerStartTS, locks) + msBeforeTxnExpired, resolvedLocks, err = ch.lockResolver.ResolveLocks(bo, callerStartTS, locks) } if err != nil { return msBeforeTxnExpired, err } if len(resolvedLocks) > 0 { - ch.minCommitTSPushed.Update(resolvedLocks) + ch.resolvedLocks.Put(resolvedLocks...) return 0, nil } return msBeforeTxnExpired, nil } // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. -func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string) (*tikvrpc.Response, *RPCContext, string, error) { - sender := NewRegionRequestSender(ch.RegionCache, ch.Client) +func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string) (*tikvrpc.Response, *RPCContext, string, error) { + sender := NewRegionRequestSender(ch.regionCache, ch.client) if len(directStoreAddr) > 0 { sender.SetStoreAddr(directStoreAddr) } sender.Stats = ch.Stats - req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() + req.Context.ResolvedLocks = ch.resolvedLocks.GetAll() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.GetStoreAddr(), err } @@ -811,7 +790,7 @@ const ( ) func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) { - logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.id, task.storeAddr) + logStr := fmt.Sprintf("[TIME_COP_PROCESS] resp_time:%s txnStartTS:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.GetID(), task.storeAddr) if bo.GetTotalSleep() > minLogBackoffTime { backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.GetTypes()), " ", ",", -1) logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.GetTotalSleep(), backoffTypes) @@ -968,7 +947,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.detail.Stats = worker.Stats worker.Stats = nil backoffTimes := bo.GetBackoffTimes() - resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) for backoff := range backoffTimes { @@ -1022,7 +1001,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon newCacheValue := coprCacheValue{ Data: data, TimeStamp: worker.req.StartTs, - RegionID: task.region.id, + RegionID: task.region.GetID(), RegionDataVersion: resp.pbResp.CacheLastVersion, } worker.store.coprCache.Set(cacheKey, &newCacheValue) @@ -1108,33 +1087,6 @@ func (it *copIterator) Close() error { return nil } -type rateLimit struct { - token chan struct{} -} - -func newRateLimit(n int) *rateLimit { - return &rateLimit{ - token: make(chan struct{}, n), - } -} - -func (r *rateLimit) getToken(done <-chan struct{}) (exit bool) { - select { - case <-done: - return true - case r.token <- struct{}{}: - return false - } -} - -func (r *rateLimit) putToken() { - select { - case <-r.token: - default: - panic("put a redundant token") - } -} - // copErrorResponse returns error when calling Next() type copErrorResponse struct{ error } diff --git a/store/tikv/coprocessor_test.go b/store/tikv/coprocessor_test.go index 6d6aa0223c198..9e893f5b2daf0 100644 --- a/store/tikv/coprocessor_test.go +++ b/store/tikv/coprocessor_test.go @@ -15,7 +15,6 @@ package tikv import ( "context" - "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" @@ -262,29 +261,3 @@ func (s *testCoprocessorSuite) rangeEqual(c *C, ranges []kv.KeyRange, keys ...st c.Assert(string(r.EndKey), Equals, keys[2*i+1]) } } - -func (s *testCoprocessorSuite) TestRateLimit(c *C) { - done := make(chan struct{}, 1) - rl := newRateLimit(1) - c.Assert(rl.putToken, PanicMatches, "put a redundant token") - exit := rl.getToken(done) - c.Assert(exit, Equals, false) - rl.putToken() - c.Assert(rl.putToken, PanicMatches, "put a redundant token") - - exit = rl.getToken(done) - c.Assert(exit, Equals, false) - done <- struct{}{} - exit = rl.getToken(done) // blocked but exit - c.Assert(exit, Equals, true) - - sig := make(chan int, 1) - go func() { - exit = rl.getToken(done) // blocked - c.Assert(exit, Equals, false) - close(sig) - }() - time.Sleep(200 * time.Millisecond) - rl.putToken() - <-sig -} diff --git a/store/tikv/mpp.go b/store/tikv/mpp.go index bc51260429cab..0145ad7c3ab3a 100644 --- a/store/tikv/mpp.go +++ b/store/tikv/mpp.go @@ -167,10 +167,10 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req originalTask := req.Meta.(*batchCopTask) for _, task := range originalTask.copTasks { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: task.task.region.id, + RegionId: task.task.region.GetID(), RegionEpoch: &metapb.RegionEpoch{ - ConfVer: task.task.region.confVer, - Version: task.task.region.ver, + ConfVer: task.task.region.GetConfVer(), + Version: task.task.region.GetVer(), }, Ranges: task.task.ranges.ToPBRanges(), }) @@ -198,17 +198,17 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // Or else it's the task without region, which always happens in high layer task without table. // In that case if len(originalTask.copTasks) != 0 { - sender := NewRegionBatchRequestSender(m.store.regionCache, m.store.client) + sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. - if sender.rpcError != nil { - m.sendError(sender.rpcError) + if sender.GetRPCError() != nil { + m.sendError(sender.GetRPCError()) return } } else { - rpcResp, err = m.store.client.SendRequest(ctx, originalTask.storeAddr, wrappedReq, ReadTimeoutMedium) + rpcResp, err = m.store.GetTiKVClient().SendRequest(ctx, originalTask.storeAddr, wrappedReq, ReadTimeoutMedium) } if err != nil { @@ -244,7 +244,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques // Drain result from root task. // We don't need to process any special error. When we meet errors, just let it fail. - rpcResp, err := m.store.client.SendRequest(bo.ctx, req.Meta.GetAddress(), wrappedReq, ReadTimeoutUltraLong) + rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, ReadTimeoutUltraLong) if err != nil { m.sendError(err) @@ -312,13 +312,14 @@ func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDa detail: new(CopRuntimeStats), } + backoffTimes := bo.GetBackoffTimes() resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond - resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) - resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) - for backoff := range bo.backoffTimes { + resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) + resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) + for backoff := range backoffTimes { backoffName := backoff.String() - resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff] - resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond + resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff] + resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond } resp.detail.CalleeAddress = req.Meta.GetAddress() diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 47e66bc5e2df4..814be77f5cbc2 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -224,6 +224,16 @@ func (s *RegionRequestSender) GetStoreAddr() string { return s.storeAddr } +// GetRPCError returns the RPC error. +func (s *RegionRequestSender) GetRPCError() error { + return s.rpcError +} + +// SetRPCError rewrite the rpc error. +func (s *RegionRequestSender) SetRPCError(err error) { + s.rpcError = err +} + // SendReq sends a request to tikv server. func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) { resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, kv.TiKV) @@ -735,7 +745,8 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed return false, nil } -func pbIsolationLevel(level kv.IsoLevel) kvrpcpb.IsolationLevel { +// IsolationLevelToPB converts isolation level to wire type. +func IsolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel { switch level { case kv.RC: return kvrpcpb.IsolationLevel_RC diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 0889714db9ece..1c376fd7b5127 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -190,7 +190,7 @@ func (s *Scanner) getData(bo *Backoffer) error { Context: &pb.Context{ Priority: s.snapshot.priority, NotFillCache: s.snapshot.notFillCache, - IsolationLevel: pbIsolationLevel(s.snapshot.isolationLevel), + IsolationLevel: IsolationLevelToPB(s.snapshot.isolationLevel), }, StartKey: s.nextStartKey, EndKey: reqEndKey, diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index f8c2458967333..2f6f3015d25ae 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/execdetails" "go.uber.org/zap" @@ -58,7 +59,7 @@ type tikvSnapshot struct { keyOnly bool vars *kv.Variables replicaReadSeed uint32 - minCommitTSPushed + resolvedLocks *util.TSSet // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -92,9 +93,7 @@ func newTiKVSnapshot(store *KVStore, ver kv.Version, replicaReadSeed uint32) *ti priority: pb.CommandPri_Normal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, - minCommitTSPushed: minCommitTSPushed{ - data: make(map[uint64]struct{}, 5), - }, + resolvedLocks: util.NewTSSet(5), } } @@ -110,7 +109,7 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) { s.mu.cached = nil s.mu.Unlock() // And also the minCommitTS pushed information. - s.minCommitTSPushed.data = make(map[uint64]struct{}, 5) + s.resolvedLocks = util.NewTSSet(5) } // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. @@ -259,12 +258,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle } func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { - cli := clientHelper{ - LockResolver: s.store.lockResolver, - RegionCache: s.store.regionCache, - minCommitTSPushed: &s.minCommitTSPushed, - Client: s.store.client, - } + cli := NewClientHelper(s.store, s.resolvedLocks) s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) @@ -407,13 +401,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte } }) - cli := clientHelper{ - LockResolver: s.store.lockResolver, - RegionCache: s.store.regionCache, - minCommitTSPushed: &s.minCommitTSPushed, - Client: s.store.client, - resolveLite: true, - } + cli := NewClientHelper(s.store, s.resolvedLocks) s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) @@ -515,7 +503,7 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { case kv.IsolationLevel: s.isolationLevel = val.(kv.IsoLevel) case kv.Priority: - s.priority = kvPriorityToCommandPri(val.(int)) + s.priority = PriorityToPB(val.(int)) case kv.NotFillCache: s.notFillCache = val.(bool) case kv.SyncLog: diff --git a/store/tikv/util/rate_limit.go b/store/tikv/util/rate_limit.go new file mode 100644 index 0000000000000..04aa1f3b89e47 --- /dev/null +++ b/store/tikv/util/rate_limit.go @@ -0,0 +1,52 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// RateLimit wraps a fix sized channel to control concurrency. +type RateLimit struct { + capacity int + token chan struct{} +} + +// NewRateLimit creates a limit controller with capacity n. +func NewRateLimit(n int) *RateLimit { + return &RateLimit{ + capacity: n, + token: make(chan struct{}, n), + } +} + +// GetToken acquires a token. +func (r *RateLimit) GetToken(done <-chan struct{}) (exit bool) { + select { + case <-done: + return true + case r.token <- struct{}{}: + return false + } +} + +// PutToken puts a token back. +func (r *RateLimit) PutToken() { + select { + case <-r.token: + default: + panic("put a redundant token") + } +} + +// GetCapacity returns the token capacity. +func (r *RateLimit) GetCapacity() int { + return r.capacity +} diff --git a/store/tikv/util/rate_limit_test.go b/store/tikv/util/rate_limit_test.go new file mode 100644 index 0000000000000..29f4a72b75e0f --- /dev/null +++ b/store/tikv/util/rate_limit_test.go @@ -0,0 +1,46 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "time" + + . "github.com/pingcap/check" +) + +func (s *testMiscSuite) TestRateLimit(c *C) { + done := make(chan struct{}, 1) + rl := NewRateLimit(1) + c.Assert(rl.PutToken, PanicMatches, "put a redundant token") + exit := rl.GetToken(done) + c.Assert(exit, Equals, false) + rl.PutToken() + c.Assert(rl.PutToken, PanicMatches, "put a redundant token") + + exit = rl.GetToken(done) + c.Assert(exit, Equals, false) + done <- struct{}{} + exit = rl.GetToken(done) // blocked but exit + c.Assert(exit, Equals, true) + + sig := make(chan int, 1) + go func() { + exit = rl.GetToken(done) // blocked + c.Assert(exit, Equals, false) + close(sig) + }() + time.Sleep(200 * time.Millisecond) + rl.PutToken() + <-sig +} diff --git a/store/tikv/util/ts_set.go b/store/tikv/util/ts_set.go new file mode 100644 index 0000000000000..798772dd84c54 --- /dev/null +++ b/store/tikv/util/ts_set.go @@ -0,0 +1,52 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "sync" + +// TSSet is a set of timestamps. +type TSSet struct { + sync.RWMutex + m map[uint64]struct{} +} + +// NewTSSet creates a set to store timestamps. +func NewTSSet(capacity int) *TSSet { + return &TSSet{ + m: make(map[uint64]struct{}, capacity), + } +} + +// Put puts timestamps into the map. +func (s *TSSet) Put(tss ...uint64) { + s.Lock() + defer s.Unlock() + for _, ts := range tss { + s.m[ts] = struct{}{} + } +} + +// GetAll returns all timestamps in the set. +func (s *TSSet) GetAll() []uint64 { + s.RLock() + defer s.RUnlock() + if len(s.m) == 0 { + return nil + } + ret := make([]uint64, 0, len(s.m)) + for ts := range s.m { + ret = append(ret, ts) + } + return ret +}