Skip to content

Commit

Permalink
add resolve lock details
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed May 19, 2022
1 parent 91986d2 commit 3a40741
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 97 deletions.
20 changes: 10 additions & 10 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() {
// Rollback the txn.
lock := s.mustGetLock([]byte("key"))

err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock)
err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock, nil)
s.Nil(err)

// Check its status is rollbacked.
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *testLockSuite) TestTxnHeartBeat() {
s.Equal(newTTL, uint64(6666))

lock := s.mustGetLock([]byte("key"))
err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock)
err = s.store.NewLockResolver().ForceResolveLock(context.Background(), lock, nil)
s.Nil(err)

newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666)
Expand Down Expand Up @@ -327,13 +327,13 @@ func (s *testLockSuite) TestCheckTxnStatus() {

// Test the ResolveLocks API
lock := s.mustGetLock([]byte("second"))
timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock})
timeBeforeExpire, err := resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}, nil)
s.Nil(err)
s.True(timeBeforeExpire > int64(0))

// Force rollback the lock using lock.TTL = 0.
lock.TTL = uint64(0)
timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock})
timeBeforeExpire, err = resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock}, nil)
s.Nil(err)
s.Equal(timeBeforeExpire, int64(0))

Expand Down Expand Up @@ -577,19 +577,19 @@ func (s *testLockSuite) TestZeroMinCommitTS() {
s.Nil(failpoint.Disable("tikvclient/mockZeroCommitTS"))

lock := s.mustGetLock([]byte("key"))
expire, pushed, _, err := s.store.NewLockResolver().ResolveLocksForRead(bo, 0, []*txnkv.Lock{lock}, true)
expire, pushed, _, err := s.store.NewLockResolver().ResolveLocksForRead(bo, 0, []*txnkv.Lock{lock}, nil, true)
s.Nil(err)
s.Len(pushed, 0)
s.Greater(expire, int64(0))

expire, pushed, _, err = s.store.NewLockResolver().ResolveLocksForRead(bo, math.MaxUint64, []*txnkv.Lock{lock}, true)
expire, pushed, _, err = s.store.NewLockResolver().ResolveLocksForRead(bo, math.MaxUint64, []*txnkv.Lock{lock}, nil, true)
s.Nil(err)
s.Len(pushed, 1)
s.Equal(expire, int64(0))

// Clean up this test.
lock.TTL = uint64(0)
expire, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
expire, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}, nil)
s.Nil(err)
s.Equal(expire, int64(0))
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
lock := s.mustGetLock([]byte("fb1"))
s.True(lock.UseAsyncCommit)
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock}, nil)
s.Nil(err)
s.Equal(expire, int64(0))

Expand Down Expand Up @@ -912,7 +912,7 @@ func (s *testLockSuite) TestResolveLocksForRead() {
// rolled back
startTS, _ = s.lockKey([]byte("k2"), []byte("v2"), []byte("k22"), []byte("v22"), 3000, false, false)
lock = s.mustGetLock([]byte("k22"))
err := s.store.NewLockResolver().ForceResolveLock(ctx, lock)
err := s.store.NewLockResolver().ForceResolveLock(ctx, lock, nil)
s.Nil(err)
resolvedLocks = append(resolvedLocks, startTS)
lock = s.mustGetLock([]byte("k2"))
Expand Down Expand Up @@ -964,7 +964,7 @@ func (s *testLockSuite) TestResolveLocksForRead() {
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
lr := s.store.NewLockResolver()
defer lr.Close()
msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false)
msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, nil, false)
s.Nil(err)
s.Greater(msBeforeExpired, int64(0))
s.Equal(resolvedLocks, resolved)
Expand Down
5 changes: 3 additions & 2 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package tikv

import (
"context"
"github.com/tikv/client-go/v2/util"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/retry"
Expand Down Expand Up @@ -121,11 +122,11 @@ func NewLockResolverProb(r *txnlock.LockResolver) *LockResolverProbe {
}

// ForceResolveLock forces to resolve a single lock. It's a helper function only for writing test.
func (l LockResolverProbe) ForceResolveLock(ctx context.Context, lock *txnlock.Lock) error {
func (l LockResolverProbe) ForceResolveLock(ctx context.Context, lock *txnlock.Lock, detail *util.ResolveLockDetail) error {
bo := retry.NewBackofferWithVars(ctx, transaction.ConfigProbe{}.GetPessimisticLockMaxBackoff(), nil)
// make use of forcing resolving lock
lock.TTL = 0
_, err := l.LockResolverProbe.ResolveLocks(bo, 0, []*txnlock.Lock{lock})
_, err := l.LockResolverProbe.ResolveLocks(bo, 0, []*txnlock.Lock{lock}, detail)
return err
}

Expand Down
8 changes: 7 additions & 1 deletion txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,13 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
return err
}

commitDetail := &util.CommitDetails{WriteSize: size, WriteKeys: c.mutations.Len()}
commitDetail := &util.CommitDetails{
WriteSize: size,
WriteKeys: c.mutations.Len(),
ResolveLock: util.ResolveLockDetail{
RequestSource: txn.RequestSource,
},
}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
c.hasNoNeedCommitKeys = checkCnt > 0
Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.getRequestSource(),
RequestSource: c.txn.GetRequestSource(),
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
2 changes: 1 addition & 1 deletion txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
ResourceGroupTag: c.resourceGroupTag,
DiskFullOpt: c.diskFullOpt,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.getRequestSource(),
RequestSource: c.txn.GetRequestSource(),
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(req)
Expand Down
8 changes: 2 additions & 6 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.getRequestSource(),
RequestSource: c.txn.GetRequestSource(),
})
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
Expand Down Expand Up @@ -230,14 +230,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks)
msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks, &action.LockCtx.Stats.ResolveLock)
if err != nil {
return err
}
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.ResolveLockTime, int64(time.Since(startTime)))
}

// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
Expand Down
6 changes: 2 additions & 4 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
ResourceGroupTag: c.resourceGroupTag,
DiskFullOpt: c.diskFullOpt,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.getRequestSource(),
RequestSource: c.txn.GetRequestSource(),
})
if c.resourceGroupTag == nil && c.resourceGroupTagger != nil {
c.resourceGroupTagger(r)
Expand Down Expand Up @@ -363,12 +363,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
}
locks = append(locks, lock)
}
start := time.Now()
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks)
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks, &c.getDetail().ResolveLock)
if err != nil {
return err
}
atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
if msBeforeExpired > 0 {
err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
Expand Down
26 changes: 11 additions & 15 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ type KVTxn struct {
diskFullOpt kvrpcpb.DiskFullOpt
commitTSUpperBoundCheck func(uint64) bool
// interceptor is used to decorate the RPC request logic related to the txn.
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
RequestSourceInternal bool
RequestSourceType string
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
}

// NewTiKVTxn creates a new KVTxn.
Expand All @@ -143,7 +142,10 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64,
enableAsyncCommit: cfg.EnableAsyncCommit,
enable1PC: cfg.Enable1PC,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
RequestSource: &util.RequestSource{},
}
// replace the request source of snapshot with txn's.
snapshot.RequestSource = newTiKVTxn.RequestSource
return newTiKVTxn, nil
}

Expand Down Expand Up @@ -672,6 +674,9 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput

lockCtx.Stats = &util.LockKeysDetails{
LockKeys: int32(len(keys)),
ResolveLock: util.ResolveLockDetail{
RequestSource: txn.RequestSource,
},
}
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
Expand Down Expand Up @@ -881,19 +886,10 @@ func (txn *KVTxn) GetClusterID() uint64 {

// SetRequestSourceInternal sets the scope of the request source.
func (txn *KVTxn) SetRequestSourceInternal(internal bool) {
txn.RequestSourceInternal = internal
txn.snapshot.RequestSourceInternal = internal
txn.RequestSource.SetRequestSourceInternal(internal)
}

// SetRequestSourceType sets the type of the request source.
func (txn *KVTxn) SetRequestSourceType(tp string) {
txn.RequestSourceType = tp
txn.snapshot.RequestSourceType = tp
}

func (txn *KVTxn) getRequestSource() string {
if txn.RequestSourceInternal {
return "internal_" + txn.RequestSourceType
}
return "external_" + txn.RequestSourceType
txn.RequestSource.SetRequestSourceType(tp)
}
45 changes: 30 additions & 15 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand Down Expand Up @@ -224,7 +225,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l, nil)
if err != nil {
return false, err
}
Expand All @@ -238,7 +239,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l, nil)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -310,19 +311,19 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, false, false)
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, detail *util.ResolveLockDetail) (int64, error) {
ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, detail, false, false)
return ttl, err
}

// ResolveLocksForRead is essentially the same as ResolveLocks, except with some optimizations for read.
// Read operations needn't wait for resolve secondary locks and can read through(the lock's transaction is committed
// and its commitTS is less than or equal to callerStartTS) or ignore(the lock's transaction is rolled back or its minCommitTS is pushed) the lock .
func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
return lr.resolveLocks(bo, callerStartTS, locks, true, lite)
func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, detail *util.ResolveLockDetail, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
return lr.resolveLocks(bo, callerStartTS, locks, detail, true, lite)
}

func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, detail *util.ResolveLockDetail, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
}
Expand All @@ -331,13 +332,21 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired.value(), nil, nil, nil
}
metrics.LockResolverCountWithResolve.Inc()
// This is the origin resolve lock time.
// TODO(you06): record the more details and calculate the total time by calculating the sum of details.
if detail != nil {
startTime := time.Now()
defer func() {
atomic.AddInt64(&detail.ResolveLockTime, int64(time.Since(startTime)))
}()
}

// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
var resolve func(*Lock, bool) (TxnStatus, error)
resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit)
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail)
if err != nil {
return TxnStatus{}, err
}
Expand Down Expand Up @@ -374,14 +383,14 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
go func() {
// Pass an empty cleanRegions here to avoid data race and
// let `reqCollapse` deduplicate identical resolve requests.
err := lr.resolveLock(asyncBo, l, status, lite, map[locate.RegionVerID]struct{}{})
err := lr.resolveLock(asyncBo, l, status, lite, map[locate.RegionVerID]struct{}{}, true, detail)
if err != nil {
logutil.BgLogger().Info("failed to resolve lock asynchronously",
zap.String("lock", l.String()), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
}
}()
} else {
err = lr.resolveLock(bo, l, status, lite, cleanRegions)
err = lr.resolveLock(bo, l, status, lite, cleanRegions, false, detail)
}
}
return status, err
Expand Down Expand Up @@ -462,10 +471,10 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false, nil)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false, nil, nil)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool, detail *util.ResolveLockDetail) (TxnStatus, error) {
var currentTS uint64
var err error
var status TxnStatus
Expand All @@ -488,7 +497,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle
time.Sleep(100 * time.Millisecond)
}
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit, l, detail)
if err == nil {
return status, nil
}
Expand Down Expand Up @@ -547,7 +556,7 @@ func (e txnNotFoundErr) Error() string {
// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary []byte,
callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool, lockInfo *Lock) (TxnStatus, error) {
callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool, lockInfo *Lock, detail *util.ResolveLockDetail) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand All @@ -574,6 +583,9 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
ForceSyncCommit: forceSyncCommit,
ResolvingPessimisticLock: resolvingPessimisticLock,
})
if detail != nil {
req.RequestSource = detail.GetRequestSource()
}
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
Expand Down Expand Up @@ -912,7 +924,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
return nil
}

func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error {
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}, async bool, detail *util.ResolveLockDetail) error {
util.EvalFailpoint("resolveLock")

metrics.LockResolverCountWithResolveLocks.Inc()
Expand Down Expand Up @@ -946,6 +958,9 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
if detail != nil {
req.RequestSource = detail.GetRequestSource()
}
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 3a40741

Please sign in to comment.