Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: add error code for write conflict #12878

Merged
merged 3 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030
github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20191014023410-e7801c924a44
github.com/pingcap/parser v0.0.0-20191023041603-32865d31ae3f
github.com/pingcap/pd v2.1.12+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20180910045846-371b48b15d93
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726 h1:AzGIEmaYVYMtmki
github.com/pingcap/kvproto v0.0.0-20190826051950-fc8799546726/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191014023410-e7801c924a44 h1:DprW0H6iFwLI3YkS/b2rqc5zvpu//myquMUKMzL26SE=
github.com/pingcap/parser v0.0.0-20191014023410-e7801c924a44/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20191023041603-32865d31ae3f h1:kvIfudXU5KGun9X5Bz6LnIbhfun6UM+NzVvkl8aPKuY=
github.com/pingcap/parser v0.0.0-20191023041603-32865d31ae3f/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.12+incompatible h1:6N3LBxx2aSZqT+IWEG730EDNDttP7dXO8J6yvBh+HXw=
github.com/pingcap/pd v2.1.12+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible h1:e9Gi/LP9181HT3gBfSOeSBA+5JfemuE4aEAhqNgoE4k=
Expand Down
28 changes: 17 additions & 11 deletions kv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
const (
codeClosed terror.ErrCode = 1
codeNotExist = 2
codeConditionNotMatch = 3
codeLockConflict = 4
codeLazyConditionPairsNotMatch = 5
codeRetryable = 6
codeCantSetNilValue = 7
Expand All @@ -38,15 +36,15 @@ const (
codeKeyExists = 1062
)

// TxnRetryableMark is used to uniform the commit error messages which could retry the transaction.
// *WARNING*: changing this string will affect the backward compatibility.
const TxnRetryableMark = "[try again later]"

var (
// ErrClosed is used when close an already closed txn.
ErrClosed = terror.ClassKV.New(codeClosed, "Error: Transaction already closed")
// ErrNotExist is used when try to get an entry with an unexist key from KV store.
ErrNotExist = terror.ClassKV.New(codeNotExist, "Error: key not exist")
// ErrConditionNotMatch is used when condition is not met.
ErrConditionNotMatch = terror.ClassKV.New(codeConditionNotMatch, "Error: Condition not match")
// ErrLockConflict is used when try to lock an already locked key.
ErrLockConflict = terror.ClassKV.New(codeLockConflict, "Error: Lock conflict")
// ErrLazyConditionPairsNotMatch is used when value in store differs from expect pairs.
ErrLazyConditionPairsNotMatch = terror.ClassKV.New(codeLazyConditionPairsNotMatch, "Error: Lazy condition pairs not match")
// ErrRetryable is used when KV store occurs RPC error or some other
Expand All @@ -69,13 +67,21 @@ var (
ErrKeyExists = terror.ClassKV.New(codeKeyExists, "key already exist")
// ErrNotImplemented returns when a function is not implemented yet.
ErrNotImplemented = terror.ClassKV.New(codeNotImplemented, "not implemented")
// ErrWriteConflict is the error when the commit meets an write conflict error.
ErrWriteConflict = terror.ClassKV.New(mysql.ErrWriteConflict,
mysql.MySQLErrName[mysql.ErrWriteConflict]+" "+TxnRetryableMark)
// ErrWriteConflictInTiDB is the error when the commit meets an write conflict error when local latch is enabled.
ErrWriteConflictInTiDB = terror.ClassKV.New(mysql.ErrWriteConflictInTiDB,
mysql.MySQLErrName[mysql.ErrWriteConflictInTiDB]+" "+TxnRetryableMark)
)

func init() {
kvMySQLErrCodes := map[terror.ErrCode]uint16{
codeKeyExists: mysql.ErrDupEntry,
codeEntryTooLarge: mysql.ErrTooBigRowsize,
codeTxnTooLarge: mysql.ErrTxnTooLarge,
codeKeyExists: mysql.ErrDupEntry,
codeEntryTooLarge: mysql.ErrTooBigRowsize,
codeTxnTooLarge: mysql.ErrTxnTooLarge,
mysql.ErrWriteConflict: mysql.ErrWriteConflict,
mysql.ErrWriteConflictInTiDB: mysql.ErrWriteConflictInTiDB,
}
terror.ErrClassToMySQLCodes[terror.ClassKV] = kvMySQLErrCodes
}
Expand All @@ -87,8 +93,8 @@ func IsRetryableError(err error) bool {
}

if ErrRetryable.Equal(err) ||
ErrLockConflict.Equal(err) ||
ErrConditionNotMatch.Equal(err) ||
ErrWriteConflict.Equal(err) ||
ErrWriteConflictInTiDB.Equal(err) ||
// TiKV exception message will tell you if you should retry or not
strings.Contains(err.Error(), "try again later") {
return true
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
logutil.Logger(context.Background()).Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Annotate(err, txnRetryableMark)
return errors.Annotate(err, kv.TxnRetryableMark)
}

c.mu.Lock()
Expand Down Expand Up @@ -685,7 +685,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) {
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
c.connID, c.startTS, c.commitTS)
return errors.Annotate(err, txnRetryableMark)
return errors.Annotate(err, kv.TxnRetryableMark)
}

start = time.Now()
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"golang.org/x/net/context"
Expand Down Expand Up @@ -223,7 +224,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
c.Assert(err, IsNil)
err = txn2.Commit(context.Background())
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue)
c.Assert(kv.ErrWriteConflictInTiDB.Equal(err), IsTrue, Commentf("err: %s", err))
}

func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (t backoffType) TError() error {
case BoTxnLock, boTxnLockFast:
return ErrResolveLockTimeout
case boPDRPC:
return ErrPDServerTimeout.GenWithStackByArgs(txnRetryableMark)
return ErrPDServerTimeout.GenWithStackByArgs(kv.TxnRetryableMark)
case BoRegionMiss, BoUpdateLeader:
return ErrRegionUnavailable
case boServerBusy:
Expand Down
17 changes: 5 additions & 12 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
)

var (
Expand All @@ -27,21 +28,13 @@ var (
// mismatchClusterID represents the message that the cluster ID of the PD client does not match the PD.
const mismatchClusterID = "mismatch cluster id"

// TiDB decides whether to retry transaction by checking if error message contains
// string "try again later" literally.
// In TiClient we use `errors.Annotate(err, txnRetryableMark)` to direct TiDB to
// restart a transaction.
// Note that it should be only used if i) the error occurs inside a transaction
// and ii) the error is not totally unexpected and hopefully will recover soon.
const txnRetryableMark = "[try again later]"

// MySQL error instances.
var (
ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]+txnRetryableMark)
ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]+txnRetryableMark)
ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]+kv.TxnRetryableMark)
ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]+kv.TxnRetryableMark)
ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]+"%v")
ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]+txnRetryableMark)
ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]+txnRetryableMark)
ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable]+kv.TxnRetryableMark)
ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]+kv.TxnRetryableMark)
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
)

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {

if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
return nil, errors.Annotate(err, txnRetryableMark)
return nil, errors.Annotate(err, kv.TxnRetryableMark)
}
return nil, errors.Trace(err)
}
Expand Down
13 changes: 4 additions & 9 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,12 @@ func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
return NewLock(locked), nil
}
if keyErr.Conflict != nil {
err := errors.New(conflictToString(keyErr.Conflict))
return nil, errors.Annotate(err, txnRetryableMark)
return nil, newWriteConflictError(keyErr.Conflict)
}
if keyErr.Retryable != "" {
err := errors.Errorf("tikv restarts txn: %s", keyErr.GetRetryable())
logutil.Logger(context.Background()).Debug("error", zap.Error(err))
return nil, errors.Annotate(err, txnRetryableMark)
return nil, errors.Annotate(err, kv.TxnRetryableMark)
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
Expand All @@ -316,16 +315,12 @@ func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
return nil, errors.Errorf("unexpected KeyError: %s", keyErr.String())
}

func conflictToString(conflict *pb.WriteConflict) string {
func newWriteConflictError(conflict *pb.WriteConflict) error {
var buf bytes.Buffer
_, err := fmt.Fprintf(&buf, "WriteConflict: txnStartTS=%d, conflictTS=%d, key=", conflict.StartTs, conflict.ConflictTs)
if err != nil {
logutil.Logger(context.Background()).Error("error", zap.Error(err))
}
prettyWriteKey(&buf, conflict.Key)
buf.WriteString(" primary=")
prettyWriteKey(&buf, conflict.Primary)
return buf.String()
return kv.ErrWriteConflict.GenWithStackByArgs(conflict.StartTs, conflict.ConflictTs, buf.String())
}

func prettyWriteKey(buf *bytes.Buffer, key []byte) {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ func (s *testSnapshotSuite) TestWriteConflictPrettyFormat(c *C) {
Primary: []byte{116, 128, 0, 0, 0, 0, 0, 1, 155, 95, 105, 128, 0, 0, 0, 0, 0, 0, 1, 1, 82, 87, 48, 49, 0, 0, 0, 0, 251, 1, 55, 54, 56, 50, 50, 49, 49, 48, 255, 57, 0, 0, 0, 0, 0, 0, 0, 248, 1, 0, 0, 0, 0, 0, 0, 0, 0, 247},
}

expectedStr := `WriteConflict: txnStartTS=399402937522847774, conflictTS=399402937719455772, key={tableID=411, indexID=1, indexValues={RW01, 768221109, , }} primary={tableID=411, indexID=1, indexValues={RW01, 768221109, , }}`
c.Assert(conflictToString(conflict), Equals, expectedStr)
expectedStr := `[kv:9007]Write conflict, txnStartTS=399402937522847774, conflictTS=399402937719455772, key={tableID=411, indexID=1, indexValues={RW01, 768221109, , }} primary={tableID=411, indexID=1, indexValues={RW01, 768221109, , }} [try again later]`
c.Assert(newWriteConflictError(conflict).Error(), Equals, expectedStr)
}
3 changes: 1 addition & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
}
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("txnStartTS %d is stale", txn.startTS)
return errors.Annotate(err, txnRetryableMark)
return kv.ErrWriteConflictInTiDB.GenWithStackByArgs(txn.startTS)
}
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
Expand Down