Skip to content

Commit

Permalink
concurrency: use lock modes to detect conflicts in SKIP LOCKED
Browse files Browse the repository at this point in the history
This patch switches isKeyLockedByConflictingTxn to make use of lock
modes to detect conflicts. As a result, SKIP LOCKED requests correctly
interact with shared locks. We add a few tests to demonstrate this.

Fixes #108715

Release note: None
  • Loading branch information
arulajmani committed Aug 18, 2023
1 parent e160427 commit 6342e6f
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 97 deletions.
24 changes: 11 additions & 13 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,22 +773,20 @@ type lockTableGuard interface {
// that conflict.
CheckOptimisticNoConflicts(*lockspanset.LockSpanSet) (ok bool)

// IsKeyLockedByConflictingTxn returns whether the specified key is claimed
// (see claimantTxn()) by a conflicting transaction in the lockTableGuard's
// snapshot of the lock table, given the caller's own desired locking
// strength. If so, true is returned. If the key is locked, the lock holder is
// also returned. Otherwise, if the key was claimed by a concurrent request
// still sequencing through the lock table, but the lock isn't held (yet), nil
// is also returned.
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lockTableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
//
// If the lock has been claimed (held or otherwise) by the transaction itself,
// there's no conflict to speak of, so false is returned. In cases where the
// lock isn't held, but the lock has been claimed by the transaction itself,
// we do not make a distinction about which request claimed the key -- it
// could either be the request itself, or a different concurrent request from
// the same transaction; The specifics do not affect the caller.
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
}

Expand Down
24 changes: 11 additions & 13 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,22 +767,20 @@ func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) {
return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans)
}

// IsKeyLockedByConflictingTxn returns whether the specified key is claimed
// (see claimantTxn()) by a conflicting transaction in the lockTableGuard's
// snapshot of the lock table, given the caller's own desired locking
// strength. If so, true is returned. If the key is locked, the lock holder is
// also returned. Otherwise, if the key was claimed by a concurrent request
// still sequencing through the lock table, but the lock isn't held (yet), nil
// is also returned.
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lockTableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
//
// If the lock has been claimed (held or otherwise) by the transaction itself,
// there's no conflict to speak of, so false is returned. In cases where the
// lock isn't held, but the lock has been claimed by the transaction itself,
// we do not make a distinction about which request claimed the key -- it
// could either be the request itself, or a different concurrent request from
// the same transaction; The specifics do not affect the caller.
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
func (g *Guard) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
Expand Down
81 changes: 53 additions & 28 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
}

func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
key roachpb.Key, str lock.Strength,
) (bool, *enginepb.TxnMeta) {
iter := g.tableSnapshot.MakeIter()
iter.SeekGE(&lockState{key: key})
Expand All @@ -692,31 +692,54 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
// The lock is empty but has not yet been deleted.
return false, nil
}
conflictingTxn, held := l.claimantTxn()
assert(conflictingTxn != nil, "non-empty lockState with no claimant transaction")
if !held {
if strength == lock.None {
// Non-locking reads only care about locks that are held.
return false, nil
}
if g.isSameTxn(conflictingTxn) {
return false, nil
}
// If the key is claimed but the lock isn't held (yet), nil is returned.
return true, nil
}
// Key locked.
txn, ts := l.getLockHolder()
if strength == lock.None && g.ts.Less(ts) {
// Non-locking read below lock's timestamp.

if l.alreadyHoldsLockAndIsAllowedToProceed(g, str) {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil
}
if g.isSameTxn(txn) {
// Already locked by this txn.

if l.isHeld() {
lockHolderTxn, _ := l.getLockHolder()
if !g.isSameTxn(lockHolderTxn) &&
lock.Conflicts(l.getLockMode(), makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, l.holder.txn // they key is locked by some other transaction; return the holder
}
// We can be in either of 2 cases at this point:
// 1. All locks held on this key are at non-conflicting strengths (i.e,
// they're compatible with the supplied strength).
// 2. OR one of the locks held on this key is by the transaction itself, and
// that lock is held with a lower strength. Simply put, the request is
// trying to upgrade its lock.
}

// There's no conflict with the lock holder itself. However, there may be
// other locking requests that came before us that we may conflict with.
// Checking for conflicts with the list of queuedWriters ensures fairness by
// preventing a stream of locking[1] SKIP LOCKED requests from starving out
// regular locking requests.
if str == lock.None { // [1] we only need to do this checking for locking requests
return false, nil
}
// "If the key is locked, the lock holder is also returned."
return true, txn

for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
// We only need to check for conflicts with requests that came before us
// (read: have lower sequence numbers than us). Note that the list of
// queuedWriters is sorted in increasing order of sequence number.
break
}
if g.isSameTxn(qqg.guard.txnMeta()) {
panic(errors.AssertionFailedf(
"SKIP LOCKED request should not find another waiting request from the same transaction",
))
}
if lock.Conflicts(qqg.mode, makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, nil // the conflict isn't with a lock holder, nil is returned
}
}
return false, nil // no conflict
}

func (g *lockTableGuardImpl) notify() {
Expand Down Expand Up @@ -1912,7 +1935,7 @@ func (l *lockState) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wai

// It is possible that the lock is already held by this request's
// transaction, and it is held with a lock strength good enough for it.
if l.alreadyHoldsLockAndIsAllowedToProceed(g) {
if l.alreadyHoldsLockAndIsAllowedToProceed(g, g.curStrength()) {
return false /* wait */
}

Expand Down Expand Up @@ -1994,11 +2017,13 @@ func (l *lockState) constructWaitingState(g *lockTableGuardImpl) waitingState {

// alreadyHoldsLockAndIsAllowedToProceed returns true if the request, referenced
// by the supplied lock table guard, is allowed to proceed because its
// transaction already holds the lock with an equal or higher lock strength.
// Otherwise, false is returned.
// transaction already holds the lock with an equal or higher lock strength
// compared to the one supplied. Otherwise, false is returned.
//
// REQUIRES: l.mu to be locked.
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl) bool {
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(
g *lockTableGuardImpl, str lock.Strength,
) bool {
lockHolderTxn, _ := l.getLockHolder()
if lockHolderTxn == nil {
return false // no one holds the lock
Expand All @@ -2012,7 +2037,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// is trying to promote a lock it previously acquired. In such cases, the
// existence of a lock with weaker strength doesn't do much for this request.
// It's no different than the case where its trying to acquire a fresh lock.
return g.curStrength() <= heldMode.Strength ||
return str <= heldMode.Strength ||
// TODO(arul): We want to allow requests that are writing to keys that they
// hold exclusive locks on to "jump ahead" of any potential waiters. This
// prevents deadlocks. The logic here is a bandaid until we implement a
Expand All @@ -2022,7 +2047,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// storing them in the list of queuedWriters. Instead of sorting the list
// of queuedWriters just based on sequence numbers alone, we'll instead use
// (belongsToALockHolderTxn, sequence number) to construct the sort order.
(g.curStrength() == lock.Intent && heldMode.Strength == lock.Exclusive)
(str == lock.Intent && heldMode.Strength == lock.Exclusive)
}

// conflictsWithLockHolder returns true if the request, referenced by the
Expand Down
Loading

0 comments on commit 6342e6f

Please sign in to comment.