Skip to content

Commit

Permalink
executor, store: Pass the SQL digest down to pessimistic lock request (
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored May 13, 2021
1 parent aecff1c commit 5d40ea4
Show file tree
Hide file tree
Showing 14 changed files with 465 additions and 41 deletions.
3 changes: 3 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -971,6 +972,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
_, sqlDigest := seVars.StmtCtx.SQLDigest()
return &tikvstore.LockCtx{
Killed: &seVars.Killed,
ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(),
Expand All @@ -980,6 +982,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest),
}
}

Expand Down
32 changes: 26 additions & 6 deletions store/mockstore/unistore/tikv/deadlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ type DetectorServer struct {
func (ds *DetectorServer) Detect(req *deadlockPb.DeadlockRequest) *deadlockPb.DeadlockResponse {
switch req.Tp {
case deadlockPb.DeadlockRequestType_Detect:
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, diagnosticContext{
key: req.Entry.Key,
resourceGroupTag: req.Entry.ResourceGroupTag,
})
if err != nil {
resp := convertErrToResp(err, req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
return resp
Expand Down Expand Up @@ -178,30 +181,35 @@ func (dt *DetectorClient) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) {
}

func (dt *DetectorClient) handleRemoteTask(requestType deadlockPb.DeadlockRequestType,
txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
txnTs uint64, waitForTxnTs uint64, keyHash uint64, diagCtx diagnosticContext) {
detectReq := &deadlockPb.DeadlockRequest{}
detectReq.Tp = requestType
detectReq.Entry.Txn = txnTs
detectReq.Entry.WaitForTxn = waitForTxnTs
detectReq.Entry.KeyHash = keyHash
detectReq.Entry.Key = diagCtx.key
detectReq.Entry.ResourceGroupTag = diagCtx.resourceGroupTag
dt.sendCh <- detectReq
}

// CleanUp processes cleaup task on local detector
// user interfaces
func (dt *DetectorClient) CleanUp(startTs uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0)
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0, diagnosticContext{})
}

// CleanUpWaitFor cleans up the specific wait edge in detector's wait map
func (dt *DetectorClient) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash)
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash, diagnosticContext{})
}

// Detect post the detection request to local deadlock detector or remote first region leader,
// the caller should use `waiter.ch` to receive possible deadlock response
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash)
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64, key []byte, resourceGroupTag []byte) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash, diagnosticContext{
key: key,
resourceGroupTag: resourceGroupTag,
})
}

// convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type
Expand All @@ -213,6 +221,18 @@ func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uin
resp := &deadlockPb.DeadlockResponse{}
resp.Entry = entry
resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash

resp.WaitChain = make([]*deadlockPb.WaitForEntry, 0, len(errDeadlock.WaitChain))
for _, item := range errDeadlock.WaitChain {
resp.WaitChain = append(resp.WaitChain, &deadlockPb.WaitForEntry{
Txn: item.Txn,
WaitForTxn: item.WaitForTxn,
KeyHash: item.KeyHash,
Key: item.Key,
ResourceGroupTag: item.ResourceGroupTag,
})
}

return resp
}

Expand Down
48 changes: 43 additions & 5 deletions store/mockstore/unistore/tikv/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand All @@ -54,6 +55,12 @@ type txnKeyHashPair struct {
txn uint64
keyHash uint64
registerTime time.Time
diagCtx diagnosticContext
}

type diagnosticContext struct {
key []byte
resourceGroupTag []byte
}

func (p *txnKeyHashPair) isExpired(ttl time.Duration, nowTime time.Time) bool {
Expand All @@ -75,13 +82,27 @@ func NewDetector(ttl time.Duration, urgentSize uint64, expireInterval time.Durat
}

// Detect detects deadlock for the sourceTxn on a locked key.
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64) *ErrDeadlock {
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) *ErrDeadlock {
d.lock.Lock()
nowTime := time.Now()
d.activeExpire(nowTime)
err := d.doDetect(nowTime, sourceTxn, waitForTxn)
if err == nil {
d.register(sourceTxn, waitForTxn, keyHash)
d.register(sourceTxn, waitForTxn, keyHash, diagCtx)
} else {
// Reverse the wait chain so that the order will be each one waiting for the next one, and append the current
// entry that finally caused the deadlock.
for i := 0; i < len(err.WaitChain)/2; i++ {
j := len(err.WaitChain) - i - 1
err.WaitChain[i], err.WaitChain[j] = err.WaitChain[j], err.WaitChain[i]
}
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
Txn: sourceTxn,
Key: diagCtx.key,
KeyHash: keyHash,
ResourceGroupTag: diagCtx.resourceGroupTag,
WaitForTxn: waitForTxn,
})
}
d.lock.Unlock()
return err
Expand All @@ -103,9 +124,26 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
continue
}
if keyHashPair.txn == sourceTxn {
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash}
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash,
WaitChain: []*deadlockPB.WaitForEntry{
{
Txn: waitForTxn,
Key: keyHashPair.diagCtx.key,
KeyHash: keyHashPair.keyHash,
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
WaitForTxn: keyHashPair.txn,
},
},
}
}
if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil {
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
Txn: waitForTxn,
Key: keyHashPair.diagCtx.key,
KeyHash: keyHashPair.keyHash,
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
WaitForTxn: keyHashPair.txn,
})
return err
}
}
Expand All @@ -115,9 +153,9 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
return nil
}

func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64) {
func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) {
val := d.waitForMap[sourceTxn]
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now()}
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now(), diagCtx: diagCtx}
if val == nil {
newList := &txnList{txns: list.New()}
newList.txns.PushBack(&pair)
Expand Down
37 changes: 29 additions & 8 deletions store/mockstore/unistore/tikv/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

. "github.com/pingcap/check"
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
)

func TestT(t *testing.T) {
Expand All @@ -42,40 +43,60 @@ var _ = Suite(&testDeadlockSuite{})
type testDeadlockSuite struct{}

func (s *testDeadlockSuite) TestDeadlock(c *C) {
makeDiagCtx := func(key string, resourceGroupTag string) diagnosticContext {
return diagnosticContext{
key: []byte(key),
resourceGroupTag: []byte(resourceGroupTag),
}
}
checkWaitChainEntry := func(entry *deadlockPB.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) {
c.Assert(entry.Txn, Equals, txn)
c.Assert(entry.WaitForTxn, Equals, waitForTxn)
c.Assert(string(entry.Key), Equals, key)
c.Assert(string(entry.ResourceGroupTag), Equals, resourceGroupTag)
}

ttl := 50 * time.Millisecond
expireInterval := 100 * time.Millisecond
urgentSize := uint64(1)
detector := NewDetector(ttl, urgentSize, expireInterval)
err := detector.Detect(1, 2, 100)
err := detector.Detect(1, 2, 100, makeDiagCtx("k1", "tag1"))
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
err = detector.Detect(2, 3, 200)
err = detector.Detect(2, 3, 200, makeDiagCtx("k2", "tag2"))
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(2))
err = detector.Detect(3, 1, 300)
err = detector.Detect(3, 1, 300, makeDiagCtx("k3", "tag3"))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("deadlock"))
c.Assert(len(err.WaitChain), Equals, 3)
// The order of entries in the wait chain is specific: each item is waiting for the next one.
checkWaitChainEntry(err.WaitChain[0], 1, 2, "k1", "tag1")
checkWaitChainEntry(err.WaitChain[1], 2, 3, "k2", "tag2")
checkWaitChainEntry(err.WaitChain[2], 3, 1, "k3", "tag3")

c.Assert(detector.totalSize, Equals, uint64(2))
detector.CleanUp(2)
list2 := detector.waitForMap[2]
c.Assert(list2, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))

// After cycle is broken, no deadlock now.
err = detector.Detect(3, 1, 300)
diagCtx := diagnosticContext{}
err = detector.Detect(3, 1, 300, diagCtx)
c.Assert(err, IsNil)
list3 := detector.waitForMap[3]
c.Assert(list3.txns.Len(), Equals, 1)
c.Assert(detector.totalSize, Equals, uint64(2))

// Different keyHash grows the list.
err = detector.Detect(3, 1, 400)
err = detector.Detect(3, 1, 400, diagCtx)
c.Assert(err, IsNil)
c.Assert(list3.txns.Len(), Equals, 2)
c.Assert(detector.totalSize, Equals, uint64(3))

// Same waitFor and key hash doesn't grow the list.
err = detector.Detect(3, 1, 400)
err = detector.Detect(3, 1, 400, diagCtx)
c.Assert(err, IsNil)
c.Assert(list3.txns.Len(), Equals, 2)
c.Assert(detector.totalSize, Equals, uint64(3))
Expand All @@ -90,15 +111,15 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) {

// after 100ms, all entries expired, detect non exist edges
time.Sleep(100 * time.Millisecond)
err = detector.Detect(100, 200, 100)
err = detector.Detect(100, 200, 100, diagCtx)
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
c.Assert(len(detector.waitForMap), Equals, 1)

// expired entry should not report deadlock, detect will remove this entry
// not dependent on expire check interval
time.Sleep(60 * time.Millisecond)
err = detector.Detect(200, 100, 200)
err = detector.Detect(200, 100, 200, diagCtx)
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
c.Assert(len(detector.waitForMap), Equals, 1)
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/unistore/tikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"fmt"

deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc"
)
Expand Down Expand Up @@ -90,6 +91,7 @@ type ErrDeadlock struct {
LockKey []byte
LockTS uint64
DeadlockKeyHash uint64
WaitChain []*deadlockPB.WaitForEntry
}

func (e ErrDeadlock) Error() string {
Expand Down
12 changes: 9 additions & 3 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
for _, m := range mutations {
lock, err := store.checkConflictInLockStore(reqCtx, m, startTS)
if err != nil {
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout)
var resourceGroupTag []byte = nil
if req.Context != nil {
resourceGroupTag = req.Context.ResourceGroupTag
}
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout, m.Key, resourceGroupTag)
}
if lock != nil {
if lock.Op != uint8(kvrpcpb.Op_PessimisticLock) {
Expand Down Expand Up @@ -533,11 +537,13 @@ func (store *MVCCStore) CheckSecondaryLocks(reqCtx *requestCtx, keys [][]byte, s
func (store *MVCCStore) normalizeWaitTime(lockWaitTime int64) time.Duration {
if lockWaitTime > store.conf.PessimisticTxn.WaitForLockTimeout {
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
} else if lockWaitTime == 0 {
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
}
return time.Duration(lockWaitTime) * time.Millisecond
}

func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64) (*lockwaiter.Waiter, error) {
func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64, key []byte, resourceGroupTag []byte) (*lockwaiter.Waiter, error) {
if locked, ok := err.(*ErrLocked); ok {
if lockWaitTime != lockwaiter.LockNoWait {
keyHash := farm.Fingerprint64(locked.Key)
Expand All @@ -546,7 +552,7 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF
log.S().Debugf("%d blocked by %d on key %d", startTS, lock.StartTS, keyHash)
waiter := store.lockWaiterManager.NewWaiter(startTS, lock.StartTS, keyHash, waitTimeDuration)
if !isFirstLock {
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash)
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash, key, resourceGroupTag)
}
return waiter, err
}
Expand Down
12 changes: 7 additions & 5 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.Pessimist
LockKey: errLocked.Key,
LockTS: errLocked.Lock.StartTS,
DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash,
WaitChain: result.DeadlockResp.WaitChain,
}
resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr)
return resp, nil
Expand Down Expand Up @@ -845,11 +846,6 @@ func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpc
return &kvrpcpb.ReadIndexResponse{}, nil
}

// GetLockWaitInfo implements implements the tikvpb.TikvServer interface.
func (svr *Server) GetLockWaitInfo(ctx context.Context, _ *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
return &kvrpcpb.GetLockWaitInfoResponse{}, nil
}

// transaction debugger commands.

// MvccGetByKey implements implements the tikvpb.TikvServer interface.
Expand Down Expand Up @@ -976,6 +972,11 @@ func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest)
return &kvrpcpb.StoreSafeTSResponse{}, nil
}

// GetLockWaitInfo implements the tikvpb.TikvServer interface.
func (svr *Server) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
panic("unimplemented")
}

func convertToKeyError(err error) *kvrpcpb.KeyError {
if err == nil {
return nil
Expand Down Expand Up @@ -1011,6 +1012,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
LockKey: x.LockKey,
LockTs: x.LockTS,
DeadlockKeyHash: x.DeadlockKeyHash,
WaitChain: x.WaitChain,
},
}
case *ErrCommitExpire:
Expand Down
1 change: 1 addition & 0 deletions store/tikv/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type LockCtx struct {
ValuesLock sync.Mutex
LockExpired *uint32
Stats *util.LockKeysDetails
ResourceGroupTag []byte
}
2 changes: 1 addition & 1 deletion store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
WaitTimeout: action.LockWaitTime,
ReturnValues: action.ReturnValues,
MinCommitTs: c.forUpdateTS + 1,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag})
lockWaitStartTime := action.WaitStartTime
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
Expand Down
Loading

0 comments on commit 5d40ea4

Please sign in to comment.