Skip to content

Commit

Permalink
Merge branch 'master' into copr/batch_cop_iter
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored May 7, 2021
2 parents 8f155e1 + ff689c0 commit b44d29a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
21 changes: 12 additions & 9 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
txndriver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -273,7 +274,7 @@ type copIteratorWorker struct {
respChan chan<- *copResponse
finishCh <-chan struct{}
vars *tikv.Variables
*tikv.ClientHelper
kvclient *tikv.ClientHelper

memTracker *memory.Tracker

Expand Down Expand Up @@ -402,7 +403,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction bool) {
respChan: it.respChan,
finishCh: it.finishCh,
vars: it.vars,
ClientHelper: tikv.NewClientHelper(it.store.store, it.resolvedLocks),
kvclient: tikv.NewClientHelper(it.store.store, it.resolvedLocks),
memTracker: it.memTracker,
replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
Expand Down Expand Up @@ -706,8 +707,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas
})
req.StoreTp = getEndPointType(task.storeType)
startTime := time.Now()
if worker.Stats == nil {
worker.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
if worker.kvclient.Stats == nil {
worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
}
if worker.req.IsStaleness {
req.EnableStaleRead()
Expand All @@ -716,7 +717,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
err = txndriver.ToTiDBErr(err)
if err != nil {
if task.storeType == kv.TiDB {
err = worker.handleTiDBSendReqErr(err, task, ch)
Expand Down Expand Up @@ -872,7 +874,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)})
msBeforeExpired, err1 := worker.kvclient.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)})
err1 = txndriver.ToTiDBErr(err1)
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down Expand Up @@ -901,8 +904,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.Stats
worker.Stats = nil
resp.detail.Stats = worker.kvclient.Stats
worker.kvclient.Stats = nil
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
Expand Down Expand Up @@ -980,7 +983,7 @@ type CopRuntimeStats struct {
func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
if terror.ErrorEqual(err, tikverr.ErrTiKVServerTimeout) {
if terror.ErrorEqual(err, txndriver.ErrTiKVServerTimeout) {
errCode = errno.ErrTiKVServerTimeout
errMsg = "TiDB server timeout, address is " + task.storeAddr
}
Expand Down
6 changes: 6 additions & 0 deletions store/driver/txn/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
ErrTiFlashServerBusy = dbterror.ClassTiKV.NewStd(errno.ErrTiFlashServerBusy)
// ErrPDServerTimeout is the error when pd server is timeout.
ErrPDServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrPDServerTimeout)
// ErrRegionUnavailable is the error when region is not available.
ErrRegionUnavailable = dbterror.ClassTiKV.NewStd(errno.ErrRegionUnavailable)
)

func genKeyExistsError(name string, value string, err error) error {
Expand Down Expand Up @@ -235,6 +237,10 @@ func ToTiDBErr(err error) error {
return ErrResolveLockTimeout
}

if errors.ErrorEqual(err, tikverr.ErrRegionUnavailable) {
return ErrRegionUnavailable
}

return errors.Trace(originErr)
}

Expand Down
2 changes: 0 additions & 2 deletions store/tikv/error/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
CodeLockAcquireFailAndNoWaitSet = 3572

// TiKV/PD/TiFlash errors.
CodeRegionUnavailable = 9005

CodeTiKVStoreLimit = 9008

CodeTiFlashServerTimeout = 9012
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
ErrTiKVServerBusy = errors.New("tikv server busy")
// ErrTiFlashServerBusy is the error that tiflash server is busy.
ErrTiFlashServerBusy = errors.New("tiflash server busy")
// ErrRegionUnavailable is the error when region is not available.
ErrRegionUnavailable = errors.New("region unavailable")
)

// MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD.
Expand All @@ -54,7 +56,6 @@ const MismatchClusterID = "mismatch cluster id"
// error instances.
var (
ErrTiFlashServerTimeout = dbterror.ClassTiKV.NewStd(CodeTiFlashServerTimeout)
ErrRegionUnavailable = dbterror.ClassTiKV.NewStd(CodeRegionUnavailable)
ErrQueryInterrupted = dbterror.ClassTiKV.NewStd(CodeQueryInterrupted)
ErrLockAcquireFailAndNoWaitSet = dbterror.ClassTiKV.NewStd(CodeLockAcquireFailAndNoWaitSet)
ErrLockWaitTimeout = dbterror.ClassTiKV.NewStd(CodeLockWaitTimeout)
Expand Down

0 comments on commit b44d29a

Please sign in to comment.