Skip to content

Commit

Permalink
store/driver: move error to single package
Browse files Browse the repository at this point in the history
Signed-off-by: shirly <[email protected]>
  • Loading branch information
AndreMouche committed May 11, 2021
1 parent 7611952 commit 025f441
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 217 deletions.
12 changes: 6 additions & 6 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
txndriver "github.com/pingcap/tidb/store/driver/txn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -536,15 +536,15 @@ func (s *testPointGetSuite) TestSelectCheckVisibility(c *C) {
c.Assert(expectErr.Equal(err), IsTrue)
}
// Test point get.
checkSelectResultError("select * from t where a='1'", txndriver.ErrGCTooEarly)
checkSelectResultError("select * from t where a='1'", storeerr.ErrGCTooEarly)
// Test batch point get.
checkSelectResultError("select * from t where a in ('1','2')", txndriver.ErrGCTooEarly)
checkSelectResultError("select * from t where a in ('1','2')", storeerr.ErrGCTooEarly)
// Test Index look up read.
checkSelectResultError("select * from t where b > 0 ", txndriver.ErrGCTooEarly)
checkSelectResultError("select * from t where b > 0 ", storeerr.ErrGCTooEarly)
// Test Index read.
checkSelectResultError("select b from t where b > 0 ", txndriver.ErrGCTooEarly)
checkSelectResultError("select b from t where b > 0 ", storeerr.ErrGCTooEarly)
// Test table read.
checkSelectResultError("select * from t", txndriver.ErrGCTooEarly)
checkSelectResultError("select * from t", storeerr.ErrGCTooEarly)
}

func (s *testPointGetSuite) TestReturnValues(c *C) {
Expand Down
8 changes: 4 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
txndriver "github.com/pingcap/tidb/store/driver/txn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/arena"
Expand Down Expand Up @@ -1569,7 +1569,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && errors.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) && retryable {
if allowTiFlashFallback && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
Expand Down Expand Up @@ -1870,10 +1870,10 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
failpoint.Inject("fetchNextErr", func(value failpoint.Value) {
switch value.(string) {
case "firstNext":
failpoint.Return(firstNext, txndriver.ErrTiFlashServerTimeout)
failpoint.Return(firstNext, storeerr.ErrTiFlashServerTimeout)
case "secondNext":
if !firstNext {
failpoint.Return(firstNext, txndriver.ErrTiFlashServerTimeout)
failpoint.Return(firstNext, storeerr.ErrTiFlashServerTimeout)
}
}
})
Expand Down
4 changes: 2 additions & 2 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/stmtctx"
txndriver "github.com/pingcap/tidb/store/driver/txn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -198,7 +198,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) && retryable {
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
Expand Down
42 changes: 21 additions & 21 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
txndriver "github.com/pingcap/tidb/store/driver/txn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
_, err := tk2.Exec("update test_kill set c = c + 1 where id = 1")
wg.Done()
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, txndriver.ErrQueryInterrupted), IsTrue)
c.Assert(terror.ErrorEqual(err, storeerr.ErrQueryInterrupted), IsTrue)
tk.MustExec("rollback")
}

Expand Down Expand Up @@ -733,10 +733,10 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {

timeoutErr := <-timeoutErrCh
c.Assert(timeoutErr, NotNil)
c.Assert(timeoutErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Assert(timeoutErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())
timeoutErr = <-timeoutErrCh
c.Assert(timeoutErr, NotNil)
c.Assert(timeoutErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Assert(timeoutErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())

// tk4 lock c1 = 2
tk4.MustExec("begin pessimistic")
Expand All @@ -749,7 +749,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
_, err := tk2.Exec("delete from tk where c1 = 2")
c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond)
c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big
c.Check(err.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(err.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())

tk4.MustExec("commit")

Expand All @@ -767,7 +767,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
_, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped
c.Check(time.Since(start), GreaterEqual, 1000*time.Millisecond)
c.Check(time.Since(start), Less, 3000*time.Millisecond) // unit test diff should not be too big
c.Check(err.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(err.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")
Expand Down Expand Up @@ -841,7 +841,7 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil)
waitErr := <-done
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(waitErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())
c.Check(duration, GreaterEqual, 1000*time.Millisecond)
c.Check(duration, LessEqual, 3000*time.Millisecond)
tk2.MustExec("rollback")
Expand Down Expand Up @@ -1131,11 +1131,11 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) {

tk1.MustExec("begin pessimistic")
err := tk1.ExecToErr("select * from t where k = 2 for update nowait")
c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 4 for update nowait")
c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 7 for update nowait")
c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk1.MustExec("rollback")

Expand All @@ -1147,9 +1147,9 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) {

tk1.MustExec("begin pessimistic")
err = tk1.ExecToErr("select * from t where k = 2 for update nowait")
c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
err = tk1.ExecToErr("select * from t where k = 6 for update nowait")
c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Check(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk1.MustExec("rollback")
}
Expand Down Expand Up @@ -1279,10 +1279,10 @@ func (s *testPessimisticSuite) TestBatchPointGetLockIndex(c *C) {
tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("insert into t1 values(2, 2, 2)")
c.Assert(err, NotNil)
c.Assert(txndriver.ErrLockWaitTimeout.Equal(err), IsTrue)
c.Assert(storeerr.ErrLockWaitTimeout.Equal(err), IsTrue)
err = tk2.ExecToErr("select * from t1 where c2 = 3 for update nowait")
c.Assert(err, NotNil)
c.Assert(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
c.Assert(storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue)
tk.MustExec("rollback")
tk2.MustExec("rollback")
}
Expand Down Expand Up @@ -1429,12 +1429,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) {
tk2.MustExec("begin pessimistic")
err := tk2.ExecToErr("select * from tu where z = 3 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue)
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tu(x, y) values(2, 2);")
err = tk2.ExecToErr("select * from tu where z = 4 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue)

// test batch point get lock
tk.MustExec("begin pessimistic")
Expand All @@ -1443,12 +1443,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) {
tk2.MustExec("begin pessimistic")
err = tk2.ExecToErr("select x from tu where z in (3, 7, 9) for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue)
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tu(x, y) values(5, 6);")
err = tk2.ExecToErr("select * from tu where z = 11 for update nowait")
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue)
c.Assert(terror.ErrorEqual(err, storeerr.ErrLockAcquireFailAndNoWaitSet), IsTrue)

tk.MustExec("commit")
tk2.MustExec("commit")
Expand Down Expand Up @@ -1996,11 +1996,11 @@ func (s *testPessimisticSuite) TestSelectForUpdateWaitSeconds(c *C) {
waitErr2 := <-errCh
waitErr3 := <-errCh
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(waitErr.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())
c.Assert(waitErr2, NotNil)
c.Check(waitErr2.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(waitErr2.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())
c.Assert(waitErr3, NotNil)
c.Check(waitErr3.Error(), Equals, txndriver.ErrLockWaitTimeout.Error())
c.Check(waitErr3.Error(), Equals, storeerr.ErrLockWaitTimeout.Error())
c.Assert(time.Since(start).Seconds(), Less, 45.0)
tk2.MustExec("commit")
tk3.MustExec("rollback")
Expand Down
6 changes: 3 additions & 3 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
txndriver "github.com/pingcap/tidb/store/driver/txn"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe
return
case <-ticker.C:
if atomic.LoadUint32(b.vars.Killed) == 1 {
resp = &batchCopResponse{err: txndriver.ErrQueryInterrupted}
resp = &batchCopResponse{err: derr.ErrQueryInterrupted}
ok = true
return
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return txndriver.ErrTiFlashServerTimeout
return derr.ErrTiFlashServerTimeout
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
tidbmetrics "github.com/pingcap/tidb/metrics"
txndriver "github.com/pingcap/tidb/store/driver/txn"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand Down Expand Up @@ -476,7 +476,7 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes
return
case <-ticker.C:
if atomic.LoadUint32(it.vars.Killed) == 1 {
resp = &copResponse{err: txndriver.ErrQueryInterrupted}
resp = &copResponse{err: derr.ErrQueryInterrupted}
ok = true
return
}
Expand Down Expand Up @@ -717,7 +717,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *backoffer, task *copTask, ch
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, tikv.ReadTimeoutMedium, getEndPointType(task.storeType), task.storeAddr, ops...)
err = txndriver.ToTiDBErr(err)
err = derr.ToTiDBErr(err)
if err != nil {
if task.storeType == kv.TiDB {
err = worker.handleTiDBSendReqErr(err, task, ch)
Expand Down Expand Up @@ -874,7 +874,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *backoffer, rpcCtx *tikv.R
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
msBeforeExpired, err1 := worker.kvclient.ResolveLocks(bo.TiKVBackoffer(), worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)})
err1 = txndriver.ToTiDBErr(err1)
err1 = derr.ToTiDBErr(err1)
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down Expand Up @@ -982,11 +982,11 @@ type CopRuntimeStats struct {
func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
if terror.ErrorEqual(err, txndriver.ErrTiKVServerTimeout) {
if terror.ErrorEqual(err, derr.ErrTiKVServerTimeout) {
errCode = errno.ErrTiKVServerTimeout
errMsg = "TiDB server timeout, address is " + task.storeAddr
}
if terror.ErrorEqual(err, txndriver.ErrTiFlashServerTimeout) {
if terror.ErrorEqual(err, derr.ErrTiFlashServerTimeout) {
errCode = errno.ErrTiFlashServerTimeout
errMsg = "TiDB server timeout, address is " + task.storeAddr
}
Expand Down
14 changes: 7 additions & 7 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/kv"
txndriver "github.com/pingcap/tidb/store/driver/txn"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -225,7 +225,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req
if sender.GetRPCError() != nil {
logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()))
// we return timeout to trigger tikv's fallback
m.sendError(txndriver.ErrTiFlashServerTimeout)
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
} else {
Expand All @@ -235,7 +235,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req
if err != nil {
logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(txndriver.ErrTiFlashServerTimeout)
m.sendError(derr.ErrTiFlashServerTimeout)
return
}

Expand All @@ -255,7 +255,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *backoffer, req
failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) {
if val.(bool) && !req.IsRoot {
time.Sleep(1 * time.Second)
m.sendError(txndriver.ErrTiFlashServerTimeout)
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
})
Expand Down Expand Up @@ -318,7 +318,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
if err != nil {
logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error()))
// we return timeout to trigger tikv's fallback
m.sendError(txndriver.ErrTiFlashServerTimeout)
m.sendError(derr.ErrTiFlashServerTimeout)
return
}

Expand Down Expand Up @@ -350,7 +350,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
}
m.sendError(txndriver.ErrTiFlashServerTimeout)
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool,
return
case <-ticker.C:
if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 {
err = txndriver.ErrQueryInterrupted
err = derr.ErrQueryInterrupted
exit = true
return
}
Expand Down
Loading

0 comments on commit 025f441

Please sign in to comment.