Skip to content

Commit

Permalink
concurrency: use lock modes to detect conflicts in SKIP LOCKED
Browse files Browse the repository at this point in the history
This patch switches isKeyLockedByConflictingTxn to make use of lock
modes to detect conflicts. As a result, SKIP LOCKED requests correctly
interact with shared locks. We add a few tests to demonstrate this.

Fixes cockroachdb#108715

Release note: None
  • Loading branch information
arulajmani committed Aug 17, 2023
1 parent e160427 commit faee1ac
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 84 deletions.
26 changes: 12 additions & 14 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,22 +773,20 @@ type lockTableGuard interface {
// that conflict.
CheckOptimisticNoConflicts(*lockspanset.LockSpanSet) (ok bool)

// IsKeyLockedByConflictingTxn returns whether the specified key is claimed
// (see claimantTxn()) by a conflicting transaction in the lockTableGuard's
// snapshot of the lock table, given the caller's own desired locking
// strength. If so, true is returned. If the key is locked, the lock holder is
// also returned. Otherwise, if the key was claimed by a concurrent request
// still sequencing through the lock table, but the lock isn't held (yet), nil
// is also returned.
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lock TableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
//
// If the lock has been claimed (held or otherwise) by the transaction itself,
// there's no conflict to speak of, so false is returned. In cases where the
// lock isn't held, but the lock has been claimed by the transaction itself,
// we do not make a distinction about which request claimed the key -- it
// could either be the request itself, or a different concurrent request from
// the same transaction; The specifics do not affect the caller.
// This method is used by requests in conjunction with the SkipLocked wait
// This method is use by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
}

Expand Down
72 changes: 46 additions & 26 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,31 +692,49 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
// The lock is empty but has not yet been deleted.
return false, nil
}
conflictingTxn, held := l.claimantTxn()
assert(conflictingTxn != nil, "non-empty lockState with no claimant transaction")
if !held {
if strength == lock.None {
// Non-locking reads only care about locks that are held.
return false, nil
}
if g.isSameTxn(conflictingTxn) {
return false, nil

if l.alreadyHoldsLockAndIsAllowedToProceed(g, strength) {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil
}

if l.isHeld() {
lockHolderTxn, _ := l.getLockHolder()
if !g.isSameTxn(lockHolderTxn) &&
lock.Conflicts(l.getLockMode(), makeLockMode(strength, g.txn, g.ts), &g.lt.settings.SV) {
return true, l.holder.txn // they key is locked by some other transaction; return the holder
}
// If the key is claimed but the lock isn't held (yet), nil is returned.
return true, nil
}
// Key locked.
txn, ts := l.getLockHolder()
if strength == lock.None && g.ts.Less(ts) {
// Non-locking read below lock's timestamp.

// There's no conflict with the lock holder itself. However, there may be
// other locking requests that came before us that we may conflict with. This
// ensures fairness by preventing a stream of locking[1] SKIP LOCKED requests
// from starving out regular locking requests.
if strength == lock.None { // [1] we only need to do this checking for locking requests
// TODO(arul): Is there a hazard for a stream of non-locking requests to
// starve out a writer, by perpetually bumping the timestamp cache from
// underneath it?
return false, nil
}
if g.isSameTxn(txn) {
// Already locked by this txn.
return false, nil

for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
qqg := e.Value.(*queuedGuard)
if qqg.guard.seqNum > g.seqNum {
// We only need to check for conflicts with requests that came before us
// (read: have lower sequence numbers than us).
break
}
if qqg.guard.txn.ID == g.txn.ID {
panic(errors.AssertionFailedf(
"SKIP LOCKED request should not find another waiting request from the same transaction",
))
}
if lock.Conflicts(qqg.mode, makeLockMode(strength, g.txn, g.ts), &g.lt.settings.SV) {
return true, nil // the conflict isn't with a lock holder, nil is returned
}
}
// "If the key is locked, the lock holder is also returned."
return true, txn
return false, nil // no conflict
}

func (g *lockTableGuardImpl) notify() {
Expand Down Expand Up @@ -1912,7 +1930,7 @@ func (l *lockState) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wai

// It is possible that the lock is already held by this request's
// transaction, and it is held with a lock strength good enough for it.
if l.alreadyHoldsLockAndIsAllowedToProceed(g) {
if l.alreadyHoldsLockAndIsAllowedToProceed(g, g.curStrength()) {
return false /* wait */
}

Expand Down Expand Up @@ -1994,11 +2012,13 @@ func (l *lockState) constructWaitingState(g *lockTableGuardImpl) waitingState {

// alreadyHoldsLockAndIsAllowedToProceed returns true if the request, referenced
// by the supplied lock table guard, is allowed to proceed because its
// transaction already holds the lock with an equal or higher lock strength.
// Otherwise, false is returned.
// transaction already holds the lock with an equal or higher lock strength
// compared to the one supplied. Otherwise, false is returned.
//
// REQUIRES: l.mu to be locked.
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl) bool {
func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(
g *lockTableGuardImpl, str lock.Strength,
) bool {
lockHolderTxn, _ := l.getLockHolder()
if lockHolderTxn == nil {
return false // no one holds the lock
Expand All @@ -2012,7 +2032,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// is trying to promote a lock it previously acquired. In such cases, the
// existence of a lock with weaker strength doesn't do much for this request.
// It's no different than the case where its trying to acquire a fresh lock.
return g.curStrength() <= heldMode.Strength ||
return str <= heldMode.Strength ||
// TODO(arul): We want to allow requests that are writing to keys that they
// hold exclusive locks on to "jump ahead" of any potential waiters. This
// prevents deadlocks. The logic here is a bandaid until we implement a
Expand All @@ -2022,7 +2042,7 @@ func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl)
// storing them in the list of queuedWriters. Instead of sorting the list
// of queuedWriters just based on sequence numbers alone, we'll instead use
// (belongsToALockHolderTxn, sequence number) to construct the sort order.
(g.curStrength() == lock.Intent && heldMode.Strength == lock.Exclusive)
(str == lock.Intent && heldMode.Strength == lock.Exclusive)
}

// conflictsWithLockHolder returns true if the request, referenced by the
Expand Down
Loading

0 comments on commit faee1ac

Please sign in to comment.