diff --git a/executor/executor.go b/executor/executor.go index e5d5d44efefe3..1666f6955bba9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -62,6 +62,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/resourcegrouptag" "go.uber.org/zap" ) @@ -971,6 +972,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx { + _, sqlDigest := seVars.StmtCtx.SQLDigest() return &tikvstore.LockCtx{ Killed: &seVars.Killed, ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(), @@ -980,6 +982,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, + ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest), } } diff --git a/store/mockstore/unistore/tikv/deadlock.go b/store/mockstore/unistore/tikv/deadlock.go index 6641a500e2cc1..de2eaf8fa61d9 100644 --- a/store/mockstore/unistore/tikv/deadlock.go +++ b/store/mockstore/unistore/tikv/deadlock.go @@ -44,7 +44,10 @@ type DetectorServer struct { func (ds *DetectorServer) Detect(req *deadlockPb.DeadlockRequest) *deadlockPb.DeadlockResponse { switch req.Tp { case deadlockPb.DeadlockRequestType_Detect: - err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash) + err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, diagnosticContext{ + key: req.Entry.Key, + resourceGroupTag: req.Entry.ResourceGroupTag, + }) if err != nil { resp := convertErrToResp(err, req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash) return resp @@ -178,30 +181,35 @@ func (dt *DetectorClient) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) { } func (dt *DetectorClient) handleRemoteTask(requestType deadlockPb.DeadlockRequestType, - txnTs uint64, waitForTxnTs uint64, keyHash uint64) { + txnTs uint64, waitForTxnTs uint64, keyHash uint64, diagCtx diagnosticContext) { detectReq := &deadlockPb.DeadlockRequest{} detectReq.Tp = requestType detectReq.Entry.Txn = txnTs detectReq.Entry.WaitForTxn = waitForTxnTs detectReq.Entry.KeyHash = keyHash + detectReq.Entry.Key = diagCtx.key + detectReq.Entry.ResourceGroupTag = diagCtx.resourceGroupTag dt.sendCh <- detectReq } // CleanUp processes cleaup task on local detector // user interfaces func (dt *DetectorClient) CleanUp(startTs uint64) { - dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0) + dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0, diagnosticContext{}) } // CleanUpWaitFor cleans up the specific wait edge in detector's wait map func (dt *DetectorClient) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) { - dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash) + dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash, diagnosticContext{}) } // Detect post the detection request to local deadlock detector or remote first region leader, // the caller should use `waiter.ch` to receive possible deadlock response -func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64) { - dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash) +func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64, key []byte, resourceGroupTag []byte) { + dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash, diagnosticContext{ + key: key, + resourceGroupTag: resourceGroupTag, + }) } // convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type @@ -213,6 +221,18 @@ func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uin resp := &deadlockPb.DeadlockResponse{} resp.Entry = entry resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash + + resp.WaitChain = make([]*deadlockPb.WaitForEntry, 0, len(errDeadlock.WaitChain)) + for _, item := range errDeadlock.WaitChain { + resp.WaitChain = append(resp.WaitChain, &deadlockPb.WaitForEntry{ + Txn: item.Txn, + WaitForTxn: item.WaitForTxn, + KeyHash: item.KeyHash, + Key: item.Key, + ResourceGroupTag: item.ResourceGroupTag, + }) + } + return resp } diff --git a/store/mockstore/unistore/tikv/detector.go b/store/mockstore/unistore/tikv/detector.go index 0273bed5fe6a8..a27adb3f35b6c 100644 --- a/store/mockstore/unistore/tikv/detector.go +++ b/store/mockstore/unistore/tikv/detector.go @@ -30,6 +30,7 @@ import ( "sync" "time" + deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/log" "go.uber.org/zap" ) @@ -54,6 +55,12 @@ type txnKeyHashPair struct { txn uint64 keyHash uint64 registerTime time.Time + diagCtx diagnosticContext +} + +type diagnosticContext struct { + key []byte + resourceGroupTag []byte } func (p *txnKeyHashPair) isExpired(ttl time.Duration, nowTime time.Time) bool { @@ -75,13 +82,27 @@ func NewDetector(ttl time.Duration, urgentSize uint64, expireInterval time.Durat } // Detect detects deadlock for the sourceTxn on a locked key. -func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64) *ErrDeadlock { +func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) *ErrDeadlock { d.lock.Lock() nowTime := time.Now() d.activeExpire(nowTime) err := d.doDetect(nowTime, sourceTxn, waitForTxn) if err == nil { - d.register(sourceTxn, waitForTxn, keyHash) + d.register(sourceTxn, waitForTxn, keyHash, diagCtx) + } else { + // Reverse the wait chain so that the order will be each one waiting for the next one, and append the current + // entry that finally caused the deadlock. + for i := 0; i < len(err.WaitChain)/2; i++ { + j := len(err.WaitChain) - i - 1 + err.WaitChain[i], err.WaitChain[j] = err.WaitChain[j], err.WaitChain[i] + } + err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{ + Txn: sourceTxn, + Key: diagCtx.key, + KeyHash: keyHash, + ResourceGroupTag: diagCtx.resourceGroupTag, + WaitForTxn: waitForTxn, + }) } d.lock.Unlock() return err @@ -103,9 +124,26 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er continue } if keyHashPair.txn == sourceTxn { - return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash} + return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash, + WaitChain: []*deadlockPB.WaitForEntry{ + { + Txn: waitForTxn, + Key: keyHashPair.diagCtx.key, + KeyHash: keyHashPair.keyHash, + ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag, + WaitForTxn: keyHashPair.txn, + }, + }, + } } if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil { + err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{ + Txn: waitForTxn, + Key: keyHashPair.diagCtx.key, + KeyHash: keyHashPair.keyHash, + ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag, + WaitForTxn: keyHashPair.txn, + }) return err } } @@ -115,9 +153,9 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er return nil } -func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64) { +func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) { val := d.waitForMap[sourceTxn] - pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now()} + pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now(), diagCtx: diagCtx} if val == nil { newList := &txnList{txns: list.New()} newList.txns.PushBack(&pair) diff --git a/store/mockstore/unistore/tikv/detector_test.go b/store/mockstore/unistore/tikv/detector_test.go index 1768cc377ec7c..b0d3a074ff840 100644 --- a/store/mockstore/unistore/tikv/detector_test.go +++ b/store/mockstore/unistore/tikv/detector_test.go @@ -31,6 +31,7 @@ import ( "time" . "github.com/pingcap/check" + deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" ) func TestT(t *testing.T) { @@ -42,19 +43,38 @@ var _ = Suite(&testDeadlockSuite{}) type testDeadlockSuite struct{} func (s *testDeadlockSuite) TestDeadlock(c *C) { + makeDiagCtx := func(key string, resourceGroupTag string) diagnosticContext { + return diagnosticContext{ + key: []byte(key), + resourceGroupTag: []byte(resourceGroupTag), + } + } + checkWaitChainEntry := func(entry *deadlockPB.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) { + c.Assert(entry.Txn, Equals, txn) + c.Assert(entry.WaitForTxn, Equals, waitForTxn) + c.Assert(string(entry.Key), Equals, key) + c.Assert(string(entry.ResourceGroupTag), Equals, resourceGroupTag) + } + ttl := 50 * time.Millisecond expireInterval := 100 * time.Millisecond urgentSize := uint64(1) detector := NewDetector(ttl, urgentSize, expireInterval) - err := detector.Detect(1, 2, 100) + err := detector.Detect(1, 2, 100, makeDiagCtx("k1", "tag1")) c.Assert(err, IsNil) c.Assert(detector.totalSize, Equals, uint64(1)) - err = detector.Detect(2, 3, 200) + err = detector.Detect(2, 3, 200, makeDiagCtx("k2", "tag2")) c.Assert(err, IsNil) c.Assert(detector.totalSize, Equals, uint64(2)) - err = detector.Detect(3, 1, 300) + err = detector.Detect(3, 1, 300, makeDiagCtx("k3", "tag3")) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, fmt.Sprintf("deadlock")) + c.Assert(len(err.WaitChain), Equals, 3) + // The order of entries in the wait chain is specific: each item is waiting for the next one. + checkWaitChainEntry(err.WaitChain[0], 1, 2, "k1", "tag1") + checkWaitChainEntry(err.WaitChain[1], 2, 3, "k2", "tag2") + checkWaitChainEntry(err.WaitChain[2], 3, 1, "k3", "tag3") + c.Assert(detector.totalSize, Equals, uint64(2)) detector.CleanUp(2) list2 := detector.waitForMap[2] @@ -62,20 +82,21 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) { c.Assert(detector.totalSize, Equals, uint64(1)) // After cycle is broken, no deadlock now. - err = detector.Detect(3, 1, 300) + diagCtx := diagnosticContext{} + err = detector.Detect(3, 1, 300, diagCtx) c.Assert(err, IsNil) list3 := detector.waitForMap[3] c.Assert(list3.txns.Len(), Equals, 1) c.Assert(detector.totalSize, Equals, uint64(2)) // Different keyHash grows the list. - err = detector.Detect(3, 1, 400) + err = detector.Detect(3, 1, 400, diagCtx) c.Assert(err, IsNil) c.Assert(list3.txns.Len(), Equals, 2) c.Assert(detector.totalSize, Equals, uint64(3)) // Same waitFor and key hash doesn't grow the list. - err = detector.Detect(3, 1, 400) + err = detector.Detect(3, 1, 400, diagCtx) c.Assert(err, IsNil) c.Assert(list3.txns.Len(), Equals, 2) c.Assert(detector.totalSize, Equals, uint64(3)) @@ -90,7 +111,7 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) { // after 100ms, all entries expired, detect non exist edges time.Sleep(100 * time.Millisecond) - err = detector.Detect(100, 200, 100) + err = detector.Detect(100, 200, 100, diagCtx) c.Assert(err, IsNil) c.Assert(detector.totalSize, Equals, uint64(1)) c.Assert(len(detector.waitForMap), Equals, 1) @@ -98,7 +119,7 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) { // expired entry should not report deadlock, detect will remove this entry // not dependent on expire check interval time.Sleep(60 * time.Millisecond) - err = detector.Detect(200, 100, 200) + err = detector.Detect(200, 100, 200, diagCtx) c.Assert(err, IsNil) c.Assert(detector.totalSize, Equals, uint64(1)) c.Assert(len(detector.waitForMap), Equals, 1) diff --git a/store/mockstore/unistore/tikv/errors.go b/store/mockstore/unistore/tikv/errors.go index 01d28fb73c896..98a70951871d5 100644 --- a/store/mockstore/unistore/tikv/errors.go +++ b/store/mockstore/unistore/tikv/errors.go @@ -16,6 +16,7 @@ package tikv import ( "fmt" + deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc" ) @@ -90,6 +91,7 @@ type ErrDeadlock struct { LockKey []byte LockTS uint64 DeadlockKeyHash uint64 + WaitChain []*deadlockPB.WaitForEntry } func (e ErrDeadlock) Error() string { diff --git a/store/mockstore/unistore/tikv/mvcc.go b/store/mockstore/unistore/tikv/mvcc.go index 4e3eb4f7d7df8..fe5a75b549945 100644 --- a/store/mockstore/unistore/tikv/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc.go @@ -239,7 +239,11 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi for _, m := range mutations { lock, err := store.checkConflictInLockStore(reqCtx, m, startTS) if err != nil { - return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout) + var resourceGroupTag []byte = nil + if req.Context != nil { + resourceGroupTag = req.Context.ResourceGroupTag + } + return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout, m.Key, resourceGroupTag) } if lock != nil { if lock.Op != uint8(kvrpcpb.Op_PessimisticLock) { @@ -533,11 +537,13 @@ func (store *MVCCStore) CheckSecondaryLocks(reqCtx *requestCtx, keys [][]byte, s func (store *MVCCStore) normalizeWaitTime(lockWaitTime int64) time.Duration { if lockWaitTime > store.conf.PessimisticTxn.WaitForLockTimeout { lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout + } else if lockWaitTime == 0 { + lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout } return time.Duration(lockWaitTime) * time.Millisecond } -func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64) (*lockwaiter.Waiter, error) { +func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64, key []byte, resourceGroupTag []byte) (*lockwaiter.Waiter, error) { if locked, ok := err.(*ErrLocked); ok { if lockWaitTime != lockwaiter.LockNoWait { keyHash := farm.Fingerprint64(locked.Key) @@ -546,7 +552,7 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF log.S().Debugf("%d blocked by %d on key %d", startTS, lock.StartTS, keyHash) waiter := store.lockWaiterManager.NewWaiter(startTS, lock.StartTS, keyHash, waitTimeDuration) if !isFirstLock { - store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash) + store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash, key, resourceGroupTag) } return waiter, err } diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index adf3049330897..036d824a39ff9 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -217,6 +217,7 @@ func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.Pessimist LockKey: errLocked.Key, LockTS: errLocked.Lock.StartTS, DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash, + WaitChain: result.DeadlockResp.WaitChain, } resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr) return resp, nil @@ -845,11 +846,6 @@ func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpc return &kvrpcpb.ReadIndexResponse{}, nil } -// GetLockWaitInfo implements implements the tikvpb.TikvServer interface. -func (svr *Server) GetLockWaitInfo(ctx context.Context, _ *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { - return &kvrpcpb.GetLockWaitInfoResponse{}, nil -} - // transaction debugger commands. // MvccGetByKey implements implements the tikvpb.TikvServer interface. @@ -976,6 +972,11 @@ func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest) return &kvrpcpb.StoreSafeTSResponse{}, nil } +// GetLockWaitInfo implements the tikvpb.TikvServer interface. +func (svr *Server) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { + panic("unimplemented") +} + func convertToKeyError(err error) *kvrpcpb.KeyError { if err == nil { return nil @@ -1011,6 +1012,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { LockKey: x.LockKey, LockTs: x.LockTS, DeadlockKeyHash: x.DeadlockKeyHash, + WaitChain: x.WaitChain, }, } case *ErrCommitExpire: diff --git a/store/tikv/kv/kv.go b/store/tikv/kv/kv.go index 2b7e87ecd2e47..8ba36a749db4f 100644 --- a/store/tikv/kv/kv.go +++ b/store/tikv/kv/kv.go @@ -27,4 +27,5 @@ type LockCtx struct { ValuesLock sync.Mutex LockExpired *uint32 Stats *util.LockKeysDetails + ResourceGroupTag []byte } diff --git a/store/tikv/pessimistic.go b/store/tikv/pessimistic.go index 445ced93ff904..2da8e93dad946 100644 --- a/store/tikv/pessimistic.go +++ b/store/tikv/pessimistic.go @@ -101,7 +101,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * WaitTimeout: action.LockWaitTime, ReturnValues: action.ReturnValues, MinCommitTs: c.forUpdateTS + 1, - }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + }, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag}) lockWaitStartTime := action.WaitStartTime for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index f7fd2a149060d..bbe7ff8d19479 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -463,10 +463,6 @@ func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRe return nil, errors.New("unreachable") } -func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { - return nil, errors.New("unreachable") -} - func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { return errors.New("unreachable") } @@ -495,6 +491,10 @@ func (s *mockTikvGrpcServer) CoprocessorV2(context.Context, *coprocessor_v2.RawC return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/store/tikv/tests/lock_test.go b/store/tikv/tests/lock_test.go index f32991877fefd..d64c1d102e6d1 100644 --- a/store/tikv/tests/lock_test.go +++ b/store/tikv/tests/lock_test.go @@ -19,13 +19,17 @@ import ( "fmt" "math" "runtime" + "sync" "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" + deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv" tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -640,3 +644,131 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) { _, err = t3.Get(context.Background(), []byte("fb2")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) } + +func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) { + // Utilities to make the test logic clear and simple. + type txnWrapper struct { + tikv.TxnProbe + wg sync.WaitGroup + } + + makeLockCtx := func(txn *txnWrapper, resourceGroupTag string) *kv.LockCtx { + return &kv.LockCtx{ + ForUpdateTS: txn.StartTS(), + WaitStartTime: time.Now(), + LockWaitTime: 1000, + ResourceGroupTag: []byte(resourceGroupTag), + } + } + + // Prepares several transactions and each locks a key. + prepareTxns := func(num int) []*txnWrapper { + res := make([]*txnWrapper, 0, num) + for i := 0; i < num; i++ { + txnProbe, err := s.store.Begin() + c.Assert(err, IsNil) + txn := &txnWrapper{TxnProbe: txnProbe} + txn.SetPessimistic(true) + tag := fmt.Sprintf("tag-init%v", i) + key := []byte{'k', byte(i)} + err = txn.LockKeys(context.Background(), makeLockCtx(txn, tag), key) + c.Assert(err, IsNil) + + res = append(res, txn) + } + return res + } + + // Let the i-th trnasaction lock the key that has been locked by j-th transaction + tryLock := func(txns []*txnWrapper, i int, j int) error { + c.Logf("txn %v try locking %v", i, j) + txn := txns[i] + tag := fmt.Sprintf("tag-%v-%v", i, j) + key := []byte{'k', byte(j)} + return txn.LockKeys(context.Background(), makeLockCtx(txn, tag), key) + } + + // Asserts the i-th transaction waits for the j-th transaction. + makeWaitFor := func(txns []*txnWrapper, i int, j int) { + txns[i].wg.Add(1) + go func() { + defer txns[i].wg.Done() + err := tryLock(txns, i, j) + // After the lock being waited for is released, the transaction returns a WriteConflict error + // unconditionally, which is by design. + c.Assert(err, NotNil) + c.Logf("txn %v wait for %v finished, err: %s", i, j, err.Error()) + _, ok := errors.Cause(err).(*tikverr.ErrWriteConflict) + c.Assert(ok, IsTrue) + }() + } + + waitAndRollback := func(txns []*txnWrapper, i int) { + // It's expected that each transaction should be rolled back after its blocker, so that `Rollback` will not + // run when there's concurrent `LockKeys` running. + // If it's blocked on the `Wait` forever, it means the transaction's blocker is not rolled back. + c.Logf("rollback txn %v", i) + txns[i].wg.Wait() + err := txns[i].Rollback() + c.Assert(err, IsNil) + } + + // Check the given WaitForEntry is caused by txn[i] waiting for txn[j]. + checkWaitChainEntry := func(txns []*txnWrapper, entry *deadlockPB.WaitForEntry, i, j int) { + c.Assert(entry.Txn, Equals, txns[i].StartTS()) + c.Assert(entry.WaitForTxn, Equals, txns[j].StartTS()) + c.Assert(entry.Key, BytesEquals, []byte{'k', byte(j)}) + c.Assert(string(entry.ResourceGroupTag), Equals, fmt.Sprintf("tag-%v-%v", i, j)) + } + + c.Log("test case 1: 1->0->1") + + txns := prepareTxns(2) + + makeWaitFor(txns, 0, 1) + // Sleep for a while to make sure it has been blocked. + time.Sleep(time.Millisecond * 100) + + // txn2 tries locking k1 and encounters deadlock error. + err := tryLock(txns, 1, 0) + c.Assert(err, NotNil) + dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock) + c.Assert(ok, IsTrue) + + waitChain := dl.GetWaitChain() + c.Assert(len(waitChain), Equals, 2) + checkWaitChainEntry(txns, waitChain[0], 0, 1) + checkWaitChainEntry(txns, waitChain[1], 1, 0) + + // Each transaction should be rolled back after its blocker being rolled back + waitAndRollback(txns, 1) + waitAndRollback(txns, 0) + + c.Log("test case 2: 3->2->0->1->3") + txns = prepareTxns(4) + + makeWaitFor(txns, 0, 1) + makeWaitFor(txns, 2, 0) + makeWaitFor(txns, 1, 3) + // Sleep for a while to make sure it has been blocked. + time.Sleep(time.Millisecond * 100) + + err = tryLock(txns, 3, 2) + c.Assert(err, NotNil) + dl, ok = errors.Cause(err).(*tikverr.ErrDeadlock) + c.Assert(ok, IsTrue) + + waitChain = dl.GetWaitChain() + c.Assert(len(waitChain), Equals, 4) + c.Logf("wait chain: \n** %v\n**%v\n**%v\n**%v\n", waitChain[0], waitChain[1], waitChain[2], waitChain[3]) + checkWaitChainEntry(txns, waitChain[0], 2, 0) + checkWaitChainEntry(txns, waitChain[1], 0, 1) + checkWaitChainEntry(txns, waitChain[2], 1, 3) + checkWaitChainEntry(txns, waitChain[3], 3, 2) + + // Each transaction should be rolled back after its blocker being rolled back + waitAndRollback(txns, 3) + waitAndRollback(txns, 1) + waitAndRollback(txns, 0) + waitAndRollback(txns, 2) +} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index cba091cbdc8da..20bf0491ed294 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -613,15 +613,18 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput // If there is only 1 key and lock fails, no need to do pessimistic rollback. if len(keys) > 1 || keyMayBeLocked { wg := txn.asyncPessimisticRollback(ctx, keys) - if dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok && hashInKeys(dl.DeadlockKeyHash, keys) { - dl.IsRetryable = true - // Wait for the pessimistic rollback to finish before we retry the statement. - wg.Wait() - // Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock. - time.Sleep(time.Millisecond * 5) - failpoint.Inject("SingleStmtDeadLockRetrySleep", func() { - time.Sleep(300 * time.Millisecond) - }) + if dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok { + logutil.Logger(ctx).Debug("deadlock error received", zap.Uint64("startTS", txn.startTS), zap.Stringer("deadlockInfo", dl)) + if hashInKeys(dl.DeadlockKeyHash, keys) { + dl.IsRetryable = true + // Wait for the pessimistic rollback to finish before we retry the statement. + wg.Wait() + // Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock. + time.Sleep(time.Millisecond * 5) + failpoint.Inject("SingleStmtDeadLockRetrySleep", func() { + time.Sleep(300 * time.Millisecond) + }) + } } } if assignedPrimaryKey { diff --git a/util/resourcegrouptag/resource_group_tag.go b/util/resourcegrouptag/resource_group_tag.go new file mode 100644 index 0000000000000..cacbf574b91fb --- /dev/null +++ b/util/resourcegrouptag/resource_group_tag.go @@ -0,0 +1,85 @@ +package resourcegrouptag + +import ( + "encoding/hex" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +const ( + resourceGroupTagPrefixSQLDigest = byte(1) +) + +// EncodeResourceGroupTag encodes sqlDigest into resource group tag. +// A resource group tag can be carried in the Context field of TiKV requests, which is a byte array, and sent to TiKV as +// diagnostic information. Currently it contains only the SQL Digest, and the codec method is naive but extendable. +// This function doesn't return error. When there's some error, which can only be caused by unexpected format of the +// arguments, it simply returns an empty result. +// The format: +// +-----------+-----------------------+----------------------------+---------------+----------------+---- +// | version=1 | field1 prefix (1byte) | field1 content (var bytes) | field2 prefix | field2 content | ... +// +-----------+-----------------------+----------------------------+---------------+----------------+---- +// The `version` section marks the codec version, which makes it easier for changing the format in the future. +// Each field starts with a byte to mark what field it is, and the length of the content depends on the field's +// definition. +// Currently there's only one field (SQL Digest), and its content starts with a byte `B` describing it's length, and +// then follows by exactly `B` bytes. +func EncodeResourceGroupTag(sqlDigest string) []byte { + if len(sqlDigest) == 0 { + return nil + } + if len(sqlDigest) >= 512 { + logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: length too long", zap.String("sqlDigest", sqlDigest)) + return nil + } + + res := make([]byte, 3+len(sqlDigest)/2) + + const encodingVersion = 1 + res[0] = encodingVersion + + res[1] = resourceGroupTagPrefixSQLDigest + // The SQL Digest is expected to be a hex string. Convert it back to bytes to save half of the memory. + res[2] = byte(len(sqlDigest) / 2) + _, err := hex.Decode(res[3:], []byte(sqlDigest)) + if err != nil { + logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: invalid hex string", zap.String("sqlDigest", sqlDigest)) + return nil + } + + return res +} + +// DecodeResourceGroupTag decodes a resource group tag into various information contained in it. Currently it contains +// only the SQL Digest. +func DecodeResourceGroupTag(data []byte) (sqlDigest string, err error) { + if len(data) == 0 { + return "", nil + } + + encodingVersion := data[0] + if encodingVersion != 1 { + return "", errors.Errorf("unsupported resource group tag version %v", data[0]) + } + rem := data[1:] + + for len(rem) > 0 { + switch rem[0] { + case resourceGroupTagPrefixSQLDigest: + // There must be one more byte at rem[1] to represent the content's length, and the remaining bytes should + // not be shorter than the length specified by rem[1]. + if len(rem) < 2 || len(rem)-2 < int(rem[1]) { + return "", errors.Errorf("cannot parse resource group tag: field length mismatch, tag: %v", hex.EncodeToString(data)) + } + fieldLen := int(rem[1]) + sqlDigest = hex.EncodeToString(rem[2 : 2+fieldLen]) + rem = rem[2+fieldLen:] + default: + return "", errors.Errorf("resource group tag field not recognized, prefix: %v, tag: %v", rem[0], hex.EncodeToString(data)) + } + } + + return +} diff --git a/util/resourcegrouptag/resource_group_tag_test.go b/util/resourcegrouptag/resource_group_tag_test.go new file mode 100644 index 0000000000000..a979b92fce315 --- /dev/null +++ b/util/resourcegrouptag/resource_group_tag_test.go @@ -0,0 +1,111 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcegrouptag + +import ( + "math/rand" + "testing" + + . "github.com/pingcap/check" +) + +type testUtilsSuite struct{} + +var _ = Suite(&testUtilsSuite{}) + +func TestT(t *testing.T) { + TestingT(t) +} + +func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) { + sqlDigest := "" + tag := EncodeResourceGroupTag(sqlDigest) + c.Assert(len(tag), Equals, 0) + decodedSQLDigest, err := DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(len(decodedSQLDigest), Equals, 0) + + sqlDigest = "aa" + tag = EncodeResourceGroupTag(sqlDigest) + // version(1) + prefix(1) + length(1) + content(2hex -> 1byte) + c.Assert(len(tag), Equals, 4) + decodedSQLDigest, err = DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(decodedSQLDigest, Equals, sqlDigest) + + sqlDigest = genRandHex(64) + tag = EncodeResourceGroupTag(sqlDigest) + decodedSQLDigest, err = DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(decodedSQLDigest, Equals, sqlDigest) + + sqlDigest = genRandHex(510) + tag = EncodeResourceGroupTag(sqlDigest) + decodedSQLDigest, err = DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(decodedSQLDigest, Equals, sqlDigest) + + // The max supported length is 255 bytes (510 hex digits). + sqlDigest = genRandHex(512) + tag = EncodeResourceGroupTag(sqlDigest) + c.Assert(len(tag), Equals, 0) + + // A hex string can't have odd length. + sqlDigest = genRandHex(15) + tag = EncodeResourceGroupTag(sqlDigest) + c.Assert(len(tag), Equals, 0) + + // Non-hexadecimal character is invalid + sqlDigest = "aabbccddgg" + tag = EncodeResourceGroupTag(sqlDigest) + c.Assert(len(tag), Equals, 0) + + // A tag should start with a supported version + tag = []byte("\x00") + _, err = DecodeResourceGroupTag(tag) + c.Assert(err, NotNil) + + // The fields should have format like `[prefix, length, content...]`, otherwise decoding it should returns error. + tag = []byte("\x01\x01") + _, err = DecodeResourceGroupTag(tag) + c.Assert(err, NotNil) + + tag = []byte("\x01\x01\x02") + _, err = DecodeResourceGroupTag(tag) + c.Assert(err, NotNil) + + tag = []byte("\x01\x01\x02AB") + decodedSQLDigest, err = DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(decodedSQLDigest, Equals, "4142") + + tag = []byte("\x01\x01\x00") + decodedSQLDigest, err = DecodeResourceGroupTag(tag) + c.Assert(err, IsNil) + c.Assert(len(decodedSQLDigest), Equals, 0) + + // Unsupported field + tag = []byte("\x01\x99") + _, err = DecodeResourceGroupTag(tag) + c.Assert(err, NotNil) +} + +func genRandHex(length int) string { + const chars = "0123456789abcdef" + res := make([]byte, length) + for i := 0; i < length; i++ { + res[i] = chars[rand.Intn(len(chars))] + } + return string(res) +}