Skip to content

Commit

Permalink
Merge #112732
Browse files Browse the repository at this point in the history
112732: concurrency: correctly handle lock updates that decrease its strength r=nvanbenschoten a=arulajmani

When a lock is updated on behalf of a non-finalized transaction, the lock table simply forgets any replicated lock information it was previously tracking. Now, if a lock is held with both replicated and unreplicated durability, and the replicated lock's strength is stronger than the unreplicated lock's strength, forgetting the replicated lock means there may be requests waiting on a key that no longer conflict with what's being tracked in memory[*]. In such cases, requests that no longer conflict with the in-memory tracking should no longer wait on this key -- they should be allowed to proceed with their scan. It's not guaranteed that they no longer conflict with the lock -- they might rediscover the replicated lock again and start waiting. However, if they no longer conflict with the lock, blocking them indefinitely can lead to deadlocks.

[*] Concretely, consider the following construction:
- key a: [txn 1, (repl intent ts@10), (unrepl shared)]
  - wait-queue readers: [none ts@12], [none ts@15]
  - wait-queue locking requests: [shared], [shared], [exclusive] [shared] In this case, updating the intent's timestamp to ts@14 (and forgetting it) should allow both the non-locking readers and first two shared locking requests to become compatible with what's being tracked in memory for this key.

Fixes #112608

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Nov 3, 2023
2 parents 58d5679 + e875c8a commit e30d643
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 39 deletions.
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/concurrency/lock/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func init() {
// Conflict rules are as described in the compatibility matrix in locking.pb.go.
func Conflicts(m1, m2 Mode, sv *settings.Values) bool {
if m1.Empty() || m2.Empty() {
panic("cannot check conflict for uninitialized locks")
return false // no conflict with empty lock modes
}
if m1.Strength > m2.Strength {
// Conflict rules are symmetric, so reduce the number of cases we need to
Expand Down Expand Up @@ -127,6 +127,15 @@ func (m *Mode) Empty() bool {
return m.Strength == None && m.Timestamp.IsEmpty()
}

// Weaker returns true if the receiver conflicts with fewer requests than the
// Mode supplied.
func (m Mode) Weaker(o Mode) bool {
if m.Strength == o.Strength {
return !m.Timestamp.Less(o.Timestamp) // lower timestamp conflicts with more requests
}
return m.Strength < o.Strength
}

// MakeModeNone constructs a Mode with strength None.
func MakeModeNone(ts hlc.Timestamp, isoLevel isolation.Level) Mode {
return Mode{
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/kvserver/concurrency/lock/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,88 @@ func TestCheckLockConflicts_IntentWithIntent(t *testing.T) {
)
}
}

func TestCheckLockConflicts_Empty(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tsLock := makeTS(2)
st := cluster.MakeTestingClusterSettings()
for _, mode := range []lock.Mode{
lock.MakeModeIntent(tsLock),
lock.MakeModeExclusive(tsLock, isolation.Serializable),
lock.MakeModeUpdate(),
lock.MakeModeShared(),
lock.MakeModeNone(tsLock, isolation.Serializable),
} {
var empty lock.Mode
testCheckLockConflicts(t, fmt.Sprintf("empty with %s", mode), empty, mode, st, false)
}
}

// TestLockModeWeaker tests strength comparison semantics for various lock mode
// combinations.
func TestLockModeWeaker(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tsBelow := makeTS(1)
tsLock := makeTS(2)
tsAbove := makeTS(3)

testCases := []struct {
m1 lock.Mode
m2 lock.Mode
exp bool
}{
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable),
m2: lock.MakeModeNone(tsBelow, isolation.Serializable), // stronger
exp: true,
},
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable), // stronger
m2: lock.MakeModeNone(tsAbove, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeNone(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock),
m2: lock.MakeModeIntent(tsBelow), // stronger
exp: true,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeIntent(tsAbove),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeShared(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeUpdate(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsAbove, isolation.Serializable),
exp: false,
},
}

for _, tc := range testCases {
require.Equal(t, tc.exp, tc.m1.Weaker(tc.m2))
}
}
142 changes: 104 additions & 38 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,20 +2950,9 @@ func (kl *keyLocks) acquireLock(
}
afterTs := tl.writeTS()
if beforeTs.Less(afterTs) {
// Check if the lock's timestamp has increased as a result of this
// lock acquisition. If it has, some waiting readers may no longer
// conflict with this lock, so they can be allowed through. We only
// need to do so if the lock is held with a strength of
// {Exclusive,Intent}, as non-locking readers do not conflict with
// any other lock strength (read: Shared). Conveniently, a key can
// only ever be locked by a single transaction with strength
// {Exclusive,Intent}. This means we don't have to bother with maintaining
// a low watermark timestamp across multiple lock holders by being cute
// here.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
kl.increasedLockTs(afterTs)
}
// If the lock's timestamp has increased as a result of this lock
// acquisition, the queue of waiting readers might need to be recomputed.
kl.recomputeWaitQueues(st)
}
return nil
}
Expand Down Expand Up @@ -3200,14 +3189,18 @@ func (kl *keyLocks) tryClearLock(force bool) bool {
// transaction, else the lock is updated. Returns whether the keyLocks struct
// can be garbage collected, and whether it was held by the txn.
// Acquires l.mu.
func (kl *keyLocks) tryUpdateLock(up *roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLock(
up *roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
kl.mu.Lock()
defer kl.mu.Unlock()
return kl.tryUpdateLockLocked(*up)
return kl.tryUpdateLockLocked(*up, st)
}

// REQUIRES: kl.mu is locked.
func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLockLocked(
up roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
if kl.isEmptyLock() {
// Already free. This can happen when an unreplicated lock is removed in
// tryActiveWait due to the txn being in the txnStatusCache.
Expand Down Expand Up @@ -3242,6 +3235,7 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
txn := &up.Txn
ts := up.Txn.WriteTimestamp
beforeTs := tl.writeTS()
beforeStr := tl.getLockMode().Strength
advancedTs := beforeTs.Less(ts)
isLocked := false
// The MVCC keyspace is the source of truth about the disposition of a
Expand Down Expand Up @@ -3321,40 +3315,108 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
return true, gc
}

if advancedTs {
// We only need to let through non-locking readers if the lock is held with
// strength {Exclusive,Intent}. See acquireLock for an explanation as to
// why.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
kl.increasedLockTs(ts)
}
afterStr := tl.getLockMode().Strength

if beforeStr != afterStr || advancedTs {
kl.recomputeWaitQueues(st)
}
// Else no change for waiters. This can happen due to a race between different
// callers of UpdateLocks().

return true, false
}

// The lock holder timestamp has increased. Some of the waiters may no longer
// need to wait.
// recomputeWaitQueues goes through the receiver's wait queues and recomputes
// whether actively waiting requests should continue to do so, given the key's
// locks holders and other waiting requests. Such computation is necessary when
// a lock's strength has decreased[1] or locking requests have dropped out of
// wait queue's[2] without actually acquiring the lock or the lock's timestamp
// has advanced.
//
// REQUIRES: kl.mu is locked.
func (kl *keyLocks) increasedLockTs(newTs hlc.Timestamp) {
distinguishedRemoved := false
// [1] This can happen as a result of savepoint rollback or when the lock table
// stops tracking a replicated lock because of a PUSH_TIMESTAMP that
// successfully bumps the pushee's timestamp.
// [2] A locking request that doesn't conflict with any held lock(s) may still
// have to actively wait if it conflicts with a lower sequence numbered request
// already in the lock's wait queue. Locking requests dropping out of a lock's
// wait queue can therefore result in other requests no longer needing to
// actively wait.
//
// TODO(arul): We could optimize this function if we had information about the
// context it was being called in.
//
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) recomputeWaitQueues(st *cluster.Settings) {
// Determine and maintain strongest mode for this key. Note that this logic is
// generalized for Update locks already -- things could be tightened if we
// only considered None, Shared, Exclusive, and Intent locking strengths
// (which are the only ones that exist at the time of writing).
var strongestMode lock.Mode
// Go through the list of lock holders.
for e := kl.holders.Front(); e != nil; e = e.Next() {
mode := e.Value.getLockMode()
if strongestMode.Weaker(mode) {
strongestMode = mode
}
}

// Go through the list of non-locking readers (all which are actively waiting,
// by definition) and check if any of them no longer conflict with the lock
// holder(s).
for e := kl.waitingReaders.Front(); e != nil; {
g := e.Value
reader := e.Value
curr := e
e = e.Next()
if g.ts.Less(newTs) {
distinguishedRemoved = distinguishedRemoved || kl.removeReader(curr)
if !lock.Conflicts(reader.curLockMode(), strongestMode, &st.SV) {
kl.removeReader(curr)
}
// Else don't inform an active waiter which continues to be an active waiter
// despite the timestamp increase.
}
if distinguishedRemoved {
kl.tryMakeNewDistinguished()

// Go through the list of locking requests and check if any that are actively
// waiting no longer need to do so. We start the iteration from the front of
// the linked list as requests are stored in increasing sequence number order.
// Moreover, as locking requests conflict with both a lock's holder(s) and
// lower sequence numbered requests waiting in the queue, we must update
// strongestMode as we go along.
for e := kl.queuedLockingRequests.Front(); e != nil; {
qlr := e.Value
curr := e
e = e.Next()
if lock.Conflicts(qlr.mode, strongestMode, &st.SV) {
break
}
removed := false
if qlr.active {
// A queued locking request, that's actively waiting, no longer conflicts
// with locks on this key -- it can be allowed to proceed. There's two
// cases:
// 1. If it's a transactional request, it needs to acquire a claim by
// holding its place in the lock wait queue while marking itself as
// inactive.
// 2. Non-transactional requests do not acquire claims, so they can be
// removed from the wait queue.
if qlr.guard.txn == nil {
kl.removeLockingRequest(curr)
removed = true
} else {
qlr.active = false // mark as inactive
if qlr.guard == kl.distinguishedWaiter {
// A new distinguished waiter will be selected by informActiveWaiters.
kl.distinguishedWaiter = nil
}
qlr.guard.mu.Lock()
qlr.guard.doneActivelyWaitingAtLock()
qlr.guard.mu.Unlock()
}
}
// Locking requests conflict with both the lock holder(s) and other lower
// sequence numbered locking requests in the lock's wait queue, so we may
// need to update strongestMode before moving on with our iteration.
if !removed && strongestMode.Weaker(qlr.mode) {
strongestMode = qlr.mode
}
}
kl.informActiveWaiters()
}

// removeLockingRequest removes the locking request (or non-transactional
Expand Down Expand Up @@ -3609,6 +3671,10 @@ func (kl *keyLocks) releaseWaitersOnKeyUnlocked() (gc bool) {
// REQUIRES: kl.mu is locked.
// REQUIRES: the (receiver) lock must not be held.
// REQUIRES: there should not be any waitingReaders in the lock's wait queues.
//
// TODO(arul): There's a lot of overlap between this method and
// recomputeWaitQueues. We should simplify things by trying to replace all
// usages of this method with recomputeWaitQueues.
func (kl *keyLocks) maybeReleaseCompatibleLockingRequests() {
if kl.isLocked() {
panic("maybeReleaseCompatibleLockingRequests called when lock is held")
Expand Down Expand Up @@ -4158,7 +4224,7 @@ func (t *lockTableImpl) updateLockInternal(up *roachpb.LockUpdate) (heldByTxn bo
var locksToGC []*keyLocks
heldByTxn = false
changeFunc := func(l *keyLocks) {
held, gc := l.tryUpdateLock(up)
held, gc := l.tryUpdateLock(up, t.settings)
heldByTxn = heldByTxn || held
if gc {
locksToGC = append(locksToGC, l)
Expand Down
Loading

0 comments on commit e30d643

Please sign in to comment.