Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, store: Pass the SQL digest down to pessimistic lock request #24380

Merged
merged 17 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -971,6 +972,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
_, sqlDigest := seVars.StmtCtx.SQLDigest()
return &tikvstore.LockCtx{
Killed: &seVars.Killed,
ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(),
Expand All @@ -980,6 +982,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest),
}
}

Expand Down
32 changes: 26 additions & 6 deletions store/mockstore/unistore/tikv/deadlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ type DetectorServer struct {
func (ds *DetectorServer) Detect(req *deadlockPb.DeadlockRequest) *deadlockPb.DeadlockResponse {
switch req.Tp {
case deadlockPb.DeadlockRequestType_Detect:
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, diagnosticContext{
key: req.Entry.Key,
resourceGroupTag: req.Entry.ResourceGroupTag,
})
if err != nil {
resp := convertErrToResp(err, req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
return resp
Expand Down Expand Up @@ -178,30 +181,35 @@ func (dt *DetectorClient) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) {
}

func (dt *DetectorClient) handleRemoteTask(requestType deadlockPb.DeadlockRequestType,
txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
txnTs uint64, waitForTxnTs uint64, keyHash uint64, diagCtx diagnosticContext) {
detectReq := &deadlockPb.DeadlockRequest{}
detectReq.Tp = requestType
detectReq.Entry.Txn = txnTs
detectReq.Entry.WaitForTxn = waitForTxnTs
detectReq.Entry.KeyHash = keyHash
detectReq.Entry.Key = diagCtx.key
detectReq.Entry.ResourceGroupTag = diagCtx.resourceGroupTag
dt.sendCh <- detectReq
}

// CleanUp processes cleaup task on local detector
// user interfaces
func (dt *DetectorClient) CleanUp(startTs uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0)
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0, diagnosticContext{})
}

// CleanUpWaitFor cleans up the specific wait edge in detector's wait map
func (dt *DetectorClient) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash)
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash, diagnosticContext{})
}

// Detect post the detection request to local deadlock detector or remote first region leader,
// the caller should use `waiter.ch` to receive possible deadlock response
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash)
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64, key []byte, resourceGroupTag []byte) {
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash, diagnosticContext{
key: key,
resourceGroupTag: resourceGroupTag,
})
}

// convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type
Expand All @@ -213,6 +221,18 @@ func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uin
resp := &deadlockPb.DeadlockResponse{}
resp.Entry = entry
resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash

resp.WaitChain = make([]*deadlockPb.WaitForEntry, 0, len(errDeadlock.WaitChain))
for _, item := range errDeadlock.WaitChain {
resp.WaitChain = append(resp.WaitChain, &deadlockPb.WaitForEntry{
Txn: item.Txn,
WaitForTxn: item.WaitForTxn,
KeyHash: item.KeyHash,
Key: item.Key,
ResourceGroupTag: item.ResourceGroupTag,
})
}

return resp
}

Expand Down
48 changes: 43 additions & 5 deletions store/mockstore/unistore/tikv/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/log"
"go.uber.org/zap"
)
Expand All @@ -54,6 +55,12 @@ type txnKeyHashPair struct {
txn uint64
keyHash uint64
registerTime time.Time
diagCtx diagnosticContext
}

type diagnosticContext struct {
key []byte
resourceGroupTag []byte
}

func (p *txnKeyHashPair) isExpired(ttl time.Duration, nowTime time.Time) bool {
Expand All @@ -75,13 +82,27 @@ func NewDetector(ttl time.Duration, urgentSize uint64, expireInterval time.Durat
}

// Detect detects deadlock for the sourceTxn on a locked key.
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64) *ErrDeadlock {
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) *ErrDeadlock {
d.lock.Lock()
nowTime := time.Now()
d.activeExpire(nowTime)
err := d.doDetect(nowTime, sourceTxn, waitForTxn)
if err == nil {
d.register(sourceTxn, waitForTxn, keyHash)
d.register(sourceTxn, waitForTxn, keyHash, diagCtx)
} else {
// Reverse the wait chain so that the order will be each one waiting for the next one, and append the current
// entry that finally caused the deadlock.
for i := 0; i < len(err.WaitChain)/2; i++ {
j := len(err.WaitChain) - i - 1
err.WaitChain[i], err.WaitChain[j] = err.WaitChain[j], err.WaitChain[i]
}
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
Txn: sourceTxn,
Key: diagCtx.key,
KeyHash: keyHash,
ResourceGroupTag: diagCtx.resourceGroupTag,
WaitForTxn: waitForTxn,
})
}
d.lock.Unlock()
return err
Expand All @@ -103,9 +124,26 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
continue
}
if keyHashPair.txn == sourceTxn {
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash}
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash,
WaitChain: []*deadlockPB.WaitForEntry{
{
Txn: waitForTxn,
Key: keyHashPair.diagCtx.key,
KeyHash: keyHashPair.keyHash,
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
WaitForTxn: keyHashPair.txn,
},
},
}
}
if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil {
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
Txn: waitForTxn,
Key: keyHashPair.diagCtx.key,
KeyHash: keyHashPair.keyHash,
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
WaitForTxn: keyHashPair.txn,
})
return err
}
}
Expand All @@ -115,9 +153,9 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
return nil
}

func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64) {
func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) {
val := d.waitForMap[sourceTxn]
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now()}
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now(), diagCtx: diagCtx}
if val == nil {
newList := &txnList{txns: list.New()}
newList.txns.PushBack(&pair)
Expand Down
37 changes: 29 additions & 8 deletions store/mockstore/unistore/tikv/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

. "github.com/pingcap/check"
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
)

func TestT(t *testing.T) {
Expand All @@ -42,40 +43,60 @@ var _ = Suite(&testDeadlockSuite{})
type testDeadlockSuite struct{}

func (s *testDeadlockSuite) TestDeadlock(c *C) {
makeDiagCtx := func(key string, resourceGroupTag string) diagnosticContext {
return diagnosticContext{
key: []byte(key),
resourceGroupTag: []byte(resourceGroupTag),
}
}
checkWaitChainEntry := func(entry *deadlockPB.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) {
c.Assert(entry.Txn, Equals, txn)
c.Assert(entry.WaitForTxn, Equals, waitForTxn)
c.Assert(string(entry.Key), Equals, key)
c.Assert(string(entry.ResourceGroupTag), Equals, resourceGroupTag)
}

ttl := 50 * time.Millisecond
expireInterval := 100 * time.Millisecond
urgentSize := uint64(1)
detector := NewDetector(ttl, urgentSize, expireInterval)
err := detector.Detect(1, 2, 100)
err := detector.Detect(1, 2, 100, makeDiagCtx("k1", "tag1"))
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
err = detector.Detect(2, 3, 200)
err = detector.Detect(2, 3, 200, makeDiagCtx("k2", "tag2"))
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(2))
err = detector.Detect(3, 1, 300)
err = detector.Detect(3, 1, 300, makeDiagCtx("k3", "tag3"))
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("deadlock"))
c.Assert(len(err.WaitChain), Equals, 3)
// The order of entries in the wait chain is specific: each item is waiting for the next one.
checkWaitChainEntry(err.WaitChain[0], 1, 2, "k1", "tag1")
checkWaitChainEntry(err.WaitChain[1], 2, 3, "k2", "tag2")
checkWaitChainEntry(err.WaitChain[2], 3, 1, "k3", "tag3")

c.Assert(detector.totalSize, Equals, uint64(2))
detector.CleanUp(2)
list2 := detector.waitForMap[2]
c.Assert(list2, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))

// After cycle is broken, no deadlock now.
err = detector.Detect(3, 1, 300)
diagCtx := diagnosticContext{}
err = detector.Detect(3, 1, 300, diagCtx)
c.Assert(err, IsNil)
list3 := detector.waitForMap[3]
c.Assert(list3.txns.Len(), Equals, 1)
c.Assert(detector.totalSize, Equals, uint64(2))

// Different keyHash grows the list.
err = detector.Detect(3, 1, 400)
err = detector.Detect(3, 1, 400, diagCtx)
c.Assert(err, IsNil)
c.Assert(list3.txns.Len(), Equals, 2)
c.Assert(detector.totalSize, Equals, uint64(3))

// Same waitFor and key hash doesn't grow the list.
err = detector.Detect(3, 1, 400)
err = detector.Detect(3, 1, 400, diagCtx)
c.Assert(err, IsNil)
c.Assert(list3.txns.Len(), Equals, 2)
c.Assert(detector.totalSize, Equals, uint64(3))
Expand All @@ -90,15 +111,15 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) {

// after 100ms, all entries expired, detect non exist edges
time.Sleep(100 * time.Millisecond)
err = detector.Detect(100, 200, 100)
err = detector.Detect(100, 200, 100, diagCtx)
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
c.Assert(len(detector.waitForMap), Equals, 1)

// expired entry should not report deadlock, detect will remove this entry
// not dependent on expire check interval
time.Sleep(60 * time.Millisecond)
err = detector.Detect(200, 100, 200)
err = detector.Detect(200, 100, 200, diagCtx)
c.Assert(err, IsNil)
c.Assert(detector.totalSize, Equals, uint64(1))
c.Assert(len(detector.waitForMap), Equals, 1)
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/unistore/tikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"fmt"

deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc"
)
Expand Down Expand Up @@ -90,6 +91,7 @@ type ErrDeadlock struct {
LockKey []byte
LockTS uint64
DeadlockKeyHash uint64
WaitChain []*deadlockPB.WaitForEntry
}

func (e ErrDeadlock) Error() string {
Expand Down
12 changes: 9 additions & 3 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
for _, m := range mutations {
lock, err := store.checkConflictInLockStore(reqCtx, m, startTS)
if err != nil {
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout)
var resourceGroupTag []byte = nil
if req.Context != nil {
resourceGroupTag = req.Context.ResourceGroupTag
}
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout, m.Key, resourceGroupTag)
}
if lock != nil {
if lock.Op != uint8(kvrpcpb.Op_PessimisticLock) {
Expand Down Expand Up @@ -533,11 +537,13 @@ func (store *MVCCStore) CheckSecondaryLocks(reqCtx *requestCtx, keys [][]byte, s
func (store *MVCCStore) normalizeWaitTime(lockWaitTime int64) time.Duration {
if lockWaitTime > store.conf.PessimisticTxn.WaitForLockTimeout {
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
} else if lockWaitTime == 0 {
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
}
return time.Duration(lockWaitTime) * time.Millisecond
}

func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64) (*lockwaiter.Waiter, error) {
func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64, key []byte, resourceGroupTag []byte) (*lockwaiter.Waiter, error) {
if locked, ok := err.(*ErrLocked); ok {
if lockWaitTime != lockwaiter.LockNoWait {
keyHash := farm.Fingerprint64(locked.Key)
Expand All @@ -546,7 +552,7 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF
log.S().Debugf("%d blocked by %d on key %d", startTS, lock.StartTS, keyHash)
waiter := store.lockWaiterManager.NewWaiter(startTS, lock.StartTS, keyHash, waitTimeDuration)
if !isFirstLock {
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash)
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash, key, resourceGroupTag)
}
return waiter, err
}
Expand Down
12 changes: 7 additions & 5 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.Pessimist
LockKey: errLocked.Key,
LockTS: errLocked.Lock.StartTS,
DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash,
WaitChain: result.DeadlockResp.WaitChain,
}
resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr)
return resp, nil
Expand Down Expand Up @@ -845,11 +846,6 @@ func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpc
return &kvrpcpb.ReadIndexResponse{}, nil
}

// GetLockWaitInfo implements implements the tikvpb.TikvServer interface.
func (svr *Server) GetLockWaitInfo(ctx context.Context, _ *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
return &kvrpcpb.GetLockWaitInfoResponse{}, nil
}

// transaction debugger commands.

// MvccGetByKey implements implements the tikvpb.TikvServer interface.
Expand Down Expand Up @@ -976,6 +972,11 @@ func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest)
return &kvrpcpb.StoreSafeTSResponse{}, nil
}

// GetLockWaitInfo implements the tikvpb.TikvServer interface.
func (svr *Server) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
panic("unimplemented")
}

func convertToKeyError(err error) *kvrpcpb.KeyError {
if err == nil {
return nil
Expand Down Expand Up @@ -1011,6 +1012,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
LockKey: x.LockKey,
LockTs: x.LockTS,
DeadlockKeyHash: x.DeadlockKeyHash,
WaitChain: x.WaitChain,
},
}
case *ErrCommitExpire:
Expand Down
1 change: 1 addition & 0 deletions store/tikv/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type LockCtx struct {
ValuesLock sync.Mutex
LockExpired *uint32
Stats *util.LockKeysDetails
ResourceGroupTag []byte
}
2 changes: 1 addition & 1 deletion store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
WaitTimeout: action.LockWaitTime,
ReturnValues: action.ReturnValues,
MinCommitTs: c.forUpdateTS + 1,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag})
lockWaitStartTime := action.WaitStartTime
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
Expand Down
Loading