Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
108918: concurrency: use lock modes to detect conflicts in SKIP LOCKED r=nvanbenschoten a=arulajmani

This patch switches isKeyLockedByConflictingTxn to make use of lock modes to detect conflicts. As a result, SKIP LOCKED requests correctly interact with shared locks. We add a few tests to demonstrate this.

Fixes cockroachdb#108715

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Aug 19, 2023
2 parents 6e160d5 + d0d6a5d commit a4a7758
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 112 deletions.
26 changes: 12 additions & 14 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,23 +773,21 @@ type lockTableGuard interface {
// that conflict.
CheckOptimisticNoConflicts(*lockspanset.LockSpanSet) (ok bool)

// IsKeyLockedByConflictingTxn returns whether the specified key is claimed
// (see claimantTxn()) by a conflicting transaction in the lockTableGuard's
// snapshot of the lock table, given the caller's own desired locking
// strength. If so, true is returned. If the key is locked, the lock holder is
// also returned. Otherwise, if the key was claimed by a concurrent request
// still sequencing through the lock table, but the lock isn't held (yet), nil
// is also returned.
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lockTableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
//
// If the lock has been claimed (held or otherwise) by the transaction itself,
// there's no conflict to speak of, so false is returned. In cases where the
// lock isn't held, but the lock has been claimed by the transaction itself,
// we do not make a distinction about which request claimed the key -- it
// could either be the request itself, or a different concurrent request from
// the same transaction; The specifics do not affect the caller.
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta, error)
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down
26 changes: 12 additions & 14 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,25 +767,23 @@ func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) {
return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans)
}

// IsKeyLockedByConflictingTxn returns whether the specified key is claimed
// (see claimantTxn()) by a conflicting transaction in the lockTableGuard's
// snapshot of the lock table, given the caller's own desired locking
// strength. If so, true is returned. If the key is locked, the lock holder is
// also returned. Otherwise, if the key was claimed by a concurrent request
// still sequencing through the lock table, but the lock isn't held (yet), nil
// is also returned.
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lockTableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
//
// If the lock has been claimed (held or otherwise) by the transaction itself,
// there's no conflict to speak of, so false is returned. In cases where the
// lock isn't held, but the lock has been claimed by the transaction itself,
// we do not make a distinction about which request claimed the key -- it
// could either be the request itself, or a different concurrent request from
// the same transaction; The specifics do not affect the caller.
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
func (g *Guard) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
return g.ltg.IsKeyLockedByConflictingTxn(key, strength)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var key string
d.ScanArgs(t, "key", &key)
strength := concurrency.ScanLockStrength(t, d)
if ok, txn := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength); ok {
ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength)
if err != nil {
return err.Error()
}
if ok {
holder := "<nil>"
if txn != nil {
holder = txn.ID.String()
Expand Down
87 changes: 56 additions & 31 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,46 +677,69 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
}

func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
key roachpb.Key, str lock.Strength,
) (bool, *enginepb.TxnMeta, error) {
iter := g.tableSnapshot.MakeIter()
iter.SeekGE(&lockState{key: key})
if !iter.Valid() || !iter.Cur().key.Equal(key) {
// No lock on key.
return false, nil
return false, nil, nil
}
l := iter.Cur()
l.mu.Lock()
defer l.mu.Unlock()
if l.isEmptyLock() {
// The lock is empty but has not yet been deleted.
return false, nil
return false, nil, nil
}
conflictingTxn, held := l.claimantTxn()
assert(conflictingTxn != nil, "non-empty lockState with no claimant transaction")
if !held {
if strength == lock.None {
// Non-locking reads only care about locks that are held.
return false, nil
}
if g.isSameTxn(conflictingTxn) {
return false, nil

if l.alreadyHoldsLockAndIsAllowedToProceed(g, str) {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil, nil
}

if l.isHeld() {
lockHolderTxn, _ := l.getLockHolder()
if !g.isSameTxn(lockHolderTxn) &&
lock.Conflicts(l.getLockMode(), makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, l.holder.txn, nil // they key is locked by some other txn; return the holder
}
// If the key is claimed but the lock isn't held (yet), nil is returned.
return true, nil
// We can be in either of 2 cases at this point:
// 1. All locks held on this key are at non-conflicting strengths (i.e,
// they're compatible with the supplied strength).
// 2. OR one of the locks held on this key is by the transaction itself, and
// that lock is held with a lower strength. Simply put, the request is
// trying to upgrade its lock.
}
// Key locked.
txn, ts := l.getLockHolder()
if strength == lock.None && g.ts.Less(ts) {
// Non-locking read below lock's timestamp.
return false, nil

// There's no conflict with the lock holder itself. However, there may be
// other locking requests that came before us that we may conflict with.
// Checking for conflicts with the list of queuedWriters ensures fairness by
// preventing a stream of locking[1] SKIP LOCKED requests from starving out
// regular locking requests.
if str == lock.None { // [1] we only need to do this checking for locking requests
return false, nil, nil
}
if g.isSameTxn(txn) {
// Already locked by this txn.
return false, nil

for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
// We only need to check for conflicts with requests that came before us
// (read: have lower sequence numbers than us). Note that the list of
// queuedWriters is sorted in increasing order of sequence number.
break
}
if g.isSameTxn(qqg.guard.txnMeta()) {
return false, nil, errors.AssertionFailedf(
"SKIP LOCKED request should not find another waiting request from the same transaction",
)
}
if lock.Conflicts(qqg.mode, makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, nil, nil // the conflict isn't with a lock holder, nil is returned
}
}
// "If the key is locked, the lock holder is also returned."
return true, txn
return false, nil, nil // no conflict
}

func (g *lockTableGuardImpl) notify() {
Expand Down Expand Up @@ -1912,7 +1935,7 @@ func (l *lockState) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wai

// It is possible that the lock is already held by this request's
// transaction, and it is held with a lock strength good enough for it.
if l.alreadyHoldsLockAndIsAllowedToProceed(g) {
if l.alreadyHoldsLockAndIsAllowedToProceed(g, g.curStrength()) {
return false /* wait */
}

Expand Down Expand Up @@ -1994,11 +2017,13 @@ func (l *lockState) constructWaitingState(g *lockTableGuardImpl) waitingState {

// alreadyHoldsLockAndIsAllowedToProceed returns true if the request, referenced
// by the supplied lock table guard, is allowed to proceed because its
// transaction already holds the lock with an equal or higher lock strength.
// Otherwise, false is returned.
// transaction already holds the lock with an equal or higher lock strength
// compared to the one supplied. Otherwise, false is returned.
//
// REQUIRES: l.mu to be locked.
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl) bool {
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(
g *lockTableGuardImpl, str lock.Strength,
) bool {
lockHolderTxn, _ := l.getLockHolder()
if lockHolderTxn == nil {
return false // no one holds the lock
Expand All @@ -2012,7 +2037,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// is trying to promote a lock it previously acquired. In such cases, the
// existence of a lock with weaker strength doesn't do much for this request.
// It's no different than the case where its trying to acquire a fresh lock.
return g.curStrength() <= heldMode.Strength ||
return str <= heldMode.Strength ||
// TODO(arul): We want to allow requests that are writing to keys that they
// hold exclusive locks on to "jump ahead" of any potential waiters. This
// prevents deadlocks. The logic here is a bandaid until we implement a
Expand All @@ -2022,7 +2047,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// storing them in the list of queuedWriters. Instead of sorting the list
// of queuedWriters just based on sequence numbers alone, we'll instead use
// (belongsToALockHolderTxn, sequence number) to construct the sort order.
(g.curStrength() == lock.Intent && heldMode.Strength == lock.Exclusive)
(str == lock.Intent && heldMode.Strength == lock.Exclusive)
}

// conflictsWithLockHolder returns true if the request, referenced by the
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,11 @@ func TestLockTableBasic(t *testing.T) {
var key string
d.ScanArgs(t, "k", &key)
strength := ScanLockStrength(t, d)
if ok, txn := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength); ok {
ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength)
if err != nil {
return err.Error()
}
if ok {
holder := "<nil>"
if txn != nil {
holder = txn.ID.String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (g *mockLockTableGuard) CheckOptimisticNoConflicts(*lockspanset.LockSpanSet
}
func (g *mockLockTableGuard) IsKeyLockedByConflictingTxn(
roachpb.Key, lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
panic("unimplemented")
}
func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} }
Expand Down
Loading

0 comments on commit a4a7758

Please sign in to comment.