Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(7.1) Fix the issue that primary pessimistic lock may be left not cleared after GC (#866) #870

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,52 +1472,81 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {

s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
s.NoError(failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
s.NoError(failpoint.Disable("tikvclient/onRollback"))
}()

k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
k1, k2, k3, k4 := []byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")
v2, v3 := []byte("v2"), []byte("v3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
txn1, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)
txn1.SetPessimistic(true)

{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
helperTxn, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
s.NoError(helperTxn.Set(k2, []byte("v0")))
s.NoError(helperTxn.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
lockCtx := kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn1.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// k1 has txn's stale pessimistic lock now.
// k1 has txn1's stale pessimistic lock now.

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))
s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3))

s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))
s.NoError(txn1.Set(k2, v2))
s.NoError(txn1.Set(k3, v3))
s.NoError(txn1.Commit(ctx))

// k3 has txn's stale prewrite lock now.
// k3 has txn1's stale prewrite lock now.

// Perform ScanLock - BatchResolveLock.
txn2, err := s.store.Begin()
txn2.SetPessimistic(true)
s.NoError(err)
lockCtx = kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn2.LockKeys(ctx, lockCtx, k4)
s.NoError(err)
s.NoError(txn2.Rollback())

// k4 has txn2's stale primary pessimistic lock now.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)

remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)

s.Len(remainingLocks, 3)
s.Equal(remainingLocks[0].Key, k1)
s.Equal(remainingLocks[0].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[1].Key, k3)
s.Equal(remainingLocks[1].LockType, kvrpcpb.Op_Put)
s.Equal(remainingLocks[2].Key, k4)
s.Equal(remainingLocks[2].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[2].Primary, k4)

// Perform ScanLock - BatchResolveLock.
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))

// Do ScanLock again to make sure no locks are left.
remainingLocks, err = s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)
s.Empty(remainingLocks)

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
Expand Down
37 changes: 37 additions & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package tikv

import (
"bytes"
"context"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -114,6 +115,42 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co
return s.resolveLocks(ctx, safepoint, concurrency)
}

func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) {
bo := NewGcResolveLockMaxBackoffer(ctx)
const limit = 1024

var result []*txnlock.Lock

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
for _, l := range locks {
if bytes.Compare(endKey, l.Key) <= 0 {
// Finished scanning the given range.
break outerLoop
}
result = append(result, l)
}

if len(locks) < limit {
if len(loc.EndKey) == 0 {
// Scanned to the very end.
break outerLoop
}
// The current region is completely scanned.
startKey = loc.EndKey
} else {
// The current region may still have more locks.
startKey = append(locks[len(locks)-1].Key, 0)
}
}

return result, nil
}

// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
Expand Down
19 changes: 18 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,27 @@ func (txn *KVTxn) Rollback() error {
txn.CancelAggressiveLocking(context.Background())
}

// `skipPessimisticRollback` may be true only when set by failpoint in tests.
skipPessimisticRollback := false
if val, err := util.EvalFailpoint("onRollback"); err == nil {
if s, ok := val.(string); ok {
if s == "skipRollbackPessimisticLock" {
logutil.BgLogger().Info("[failpoint] injected skip pessimistic rollback on explicit rollback",
zap.Uint64("txnStartTS", txn.startTS))
skipPessimisticRollback = true
} else {
panic(fmt.Sprintf("unknown instruction %s for failpoint \"onRollback\"", s))
}
}
}

start := time.Now()
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
var err error
if !skipPessimisticRollback {
err = txn.rollbackPessimisticLocks()
}
txn.committer.ttlManager.close()
if err != nil {
logutil.BgLogger().Error(err.Error())
Expand Down
17 changes: 11 additions & 6 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,29 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
//
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
// See: https://github.com/pingcap/tidb/issues/45134
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
Expand Down Expand Up @@ -1172,6 +1175,8 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
}
}

// resolvePessimisticLock handles pessimistic locks after checking txn status.
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
Expand Down