Skip to content

Commit

Permalink
concurrency: correctly handle lock updates that decrease its strength
Browse files Browse the repository at this point in the history
When a lock is updated on behalf of a non-finalized transaction, the
lock table simply forgets any replicated lock information it was
previously tracking. Now, if a lock is held with both replicated and
unreplicated durability, and the replicated lock's strength is stronger
than the unreplicated lock's strength, forgetting the replicated lock
means there may be requests waiting on a key that no longer conflict
with what's being tracked in memory[1]. In such cases, requests that no
longer conflict with the in-memory tracking should no longer wait on
this key -- they should be allowed to proceed with their scan. It's
not guaranteed that they no longer conflict with the lock -- they might
rediscover the replicated lock again and start waiting. However, if they
no longer conflict with the lock, blocking them indefinitely can lead to
undetected deadlocks in rare cases[2].

[1] Concretely, consider the following construction:
- key a: [txn 1, (repl intent ts@10), (unrepl shared)]
  - wait-queue readers: [none ts@12], [none ts@15]
  - wait-queue locking requests: [shared], [shared], [exclusive] [shared]
In this case, updating the intent's timestamp to ts@14 (and forgetting
it) should allow both the non-locking readers and first two shared
locking requests to become compatible with what's being tracked in
memory for this key.
[2] This can happen if the lock update corresponds to a successful
PUSH_TIMESTAMP. The pusher will be blocked in the lock table waiter
waiting for a `waitingState` update that never comes. Now, if there
was a dependency cycle between the pusher and pushee had the push
timestamp been blocking, we would find ourselves in an undetected
deeadlock situation.

Fixes #112608

Release note: None
  • Loading branch information
arulajmani committed Nov 3, 2023
1 parent 5aa6a0a commit 7745487
Show file tree
Hide file tree
Showing 6 changed files with 562 additions and 36 deletions.
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/concurrency/lock/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func init() {
// Conflict rules are as described in the compatibility matrix in locking.pb.go.
func Conflicts(m1, m2 Mode, sv *settings.Values) bool {
if m1.Empty() || m2.Empty() {
panic("cannot check conflict for uninitialized locks")
return false // no conflict with empty lock modes
}
if m1.Strength > m2.Strength {
// Conflict rules are symmetric, so reduce the number of cases we need to
Expand Down Expand Up @@ -127,6 +127,15 @@ func (m *Mode) Empty() bool {
return m.Strength == None && m.Timestamp.IsEmpty()
}

// Weaker returns true if the receiver conflicts with fewer requests than the
// Mode supplied.
func (m Mode) Weaker(o Mode) bool {
if m.Strength == o.Strength {
return !m.Timestamp.Less(o.Timestamp) // lower timestamp conflicts with more requests
}
return m.Strength < o.Strength
}

// MakeModeNone constructs a Mode with strength None.
func MakeModeNone(ts hlc.Timestamp, isoLevel isolation.Level) Mode {
return Mode{
Expand Down
67 changes: 67 additions & 0 deletions pkg/kv/kvserver/concurrency/lock/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,70 @@ func TestCheckLockConflicts_IntentWithIntent(t *testing.T) {
)
}
}

// TestLockModeWeaker tests strength comparison semantics for various lock mode
// combinations.
func TestLockModeWeaker(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tsBelow := makeTS(1)
tsLock := makeTS(2)
tsAbove := makeTS(3)

testCases := []struct {
m1 lock.Mode
m2 lock.Mode
exp bool
}{
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable),
m2: lock.MakeModeNone(tsBelow, isolation.Serializable), // stronger
exp: true,
},
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable), // stronger
m2: lock.MakeModeNone(tsAbove, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeNone(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock),
m2: lock.MakeModeIntent(tsBelow), // stronger
exp: true,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeIntent(tsAbove),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeShared(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeUpdate(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsAbove, isolation.Serializable),
exp: false,
},
}

for _, tc := range testCases {
require.Equal(t, tc.exp, tc.m1.Weaker(tc.m2))
}
}
146 changes: 111 additions & 35 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2950,19 +2950,18 @@ func (kl *keyLocks) acquireLock(
}
afterTs := tl.writeTS()
if beforeTs.Less(afterTs) {
// Check if the lock's timestamp has increased as a result of this
// lock acquisition. If it has, some waiting readers may no longer
// conflict with this lock, so they can be allowed through. We only
// need to do so if the lock is held with a strength of
// {Exclusive,Intent}, as non-locking readers do not conflict with
// any other lock strength (read: Shared). Conveniently, a key can
// only ever be locked by a single transaction with strength
// {Exclusive,Intent}. This means we don't have to bother with maintaining
// a low watermark timestamp across multiple lock holders by being cute
// here.
// If the lock's timestamp has increased as a result of this lock
// acquisition, some waiting readers may no longer conflict with this
// lock, and should be allowed through. Note that this is only possible if
// the lock is held with strength Intent or Exclusive[*], as other lock
// strengths do not block non-locking readers.
//
// [*] TODO(arul): Once unreplicated exclusive locks stop blocking
// non-locking readers, we'd only need to recompute wait queues if the
// lock was held with Intent lock strength.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
kl.increasedLockTs(afterTs)
kl.recomputeWaitQueues(st)
}
}
return nil
Expand Down Expand Up @@ -3200,14 +3199,18 @@ func (kl *keyLocks) tryClearLock(force bool) bool {
// transaction, else the lock is updated. Returns whether the keyLocks struct
// can be garbage collected, and whether it was held by the txn.
// Acquires l.mu.
func (kl *keyLocks) tryUpdateLock(up *roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLock(
up *roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
kl.mu.Lock()
defer kl.mu.Unlock()
return kl.tryUpdateLockLocked(*up)
return kl.tryUpdateLockLocked(*up, st)
}

// REQUIRES: kl.mu is locked.
func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLockLocked(
up roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
if kl.isEmptyLock() {
// Already free. This can happen when an unreplicated lock is removed in
// tryActiveWait due to the txn being in the txnStatusCache.
Expand Down Expand Up @@ -3242,6 +3245,7 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
txn := &up.Txn
ts := up.Txn.WriteTimestamp
beforeTs := tl.writeTS()
beforeStr := tl.getLockMode().Strength
advancedTs := beforeTs.Less(ts)
isLocked := false
// The MVCC keyspace is the source of truth about the disposition of a
Expand Down Expand Up @@ -3321,40 +3325,108 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
return true, gc
}

if advancedTs {
// We only need to let through non-locking readers if the lock is held with
// strength {Exclusive,Intent}. See acquireLock for an explanation as to
// why.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
kl.increasedLockTs(ts)
}
afterStr := tl.getLockMode().Strength

if beforeStr != afterStr || advancedTs {
kl.recomputeWaitQueues(st)
}
// Else no change for waiters. This can happen due to a race between different
// callers of UpdateLocks().

return true, false
}

// The lock holder timestamp has increased. Some of the waiters may no longer
// need to wait.
// recomputeWaitQueues goes through the receiver's wait queues and recomputes
// whether actively waiting requests should continue to do so, given the key's
// locks holders and other waiting requests. Such computation is necessary when
// a lock's strength has decreased[1] or locking requests have dropped out of
// wait queue's[2] without actually acquiring the lock or the lock's timestamp
// has advanced.
//
// REQUIRES: kl.mu is locked.
func (kl *keyLocks) increasedLockTs(newTs hlc.Timestamp) {
distinguishedRemoved := false
// [1] This can happen as a result of savepoint rollback or when the lock table
// stops tracking a replicated lock because of a PUSH_TIMESTAMP that
// successfully bumps the pushee's timestamp.
// [2] A locking request that doesn't conflict with any held lock(s) may still
// have to actively wait if it conflicts with a lower sequence numbered request
// already in the lock's wait queue. Locking requests dropping out of a lock's
// wait queue can therefore result in other requests no longer needing to
// actively wait.
//
// TODO(arul): We could optimize this function if we had information about the
// context it was being called in.
//
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) recomputeWaitQueues(st *cluster.Settings) {
// Determine and maintain strongest mode for this key. Note that this logic is
// generalized for Update locks already -- things could be tightened if we
// only considered None, Shared, Exclusive, and Intent locking strengths
// (which are the only ones that exist at the time of writing).
var strongestMode lock.Mode
// Go through the list of lock holders.
for e := kl.holders.Front(); e != nil; e = e.Next() {
mode := e.Value.getLockMode()
if strongestMode.Weaker(mode) {
strongestMode = mode
}
}

// Go through the list of non-locking readers (all which are actively waiting,
// by definition) and check if any of them no longer conflict with the lock
// holder(s).
for e := kl.waitingReaders.Front(); e != nil; {
g := e.Value
reader := e.Value
curr := e
e = e.Next()
if g.ts.Less(newTs) {
distinguishedRemoved = distinguishedRemoved || kl.removeReader(curr)
if !lock.Conflicts(reader.curLockMode(), strongestMode, &st.SV) {
kl.removeReader(curr)
}
// Else don't inform an active waiter which continues to be an active waiter
// despite the timestamp increase.
}
if distinguishedRemoved {
kl.tryMakeNewDistinguished()

// Go through the list of locking requests and check if any that are actively
// waiting no longer need to do so. We start the iteration from the front of
// the linked list as requests are stored in increasing sequence number order.
// Moreover, as locking requests conflict with both a lock's holder(s) and
// lower sequence numbered requests waiting in the queue, we must update
// strongestMode as we go along.
for e := kl.queuedLockingRequests.Front(); e != nil; {
qlr := e.Value
curr := e
e = e.Next()
if lock.Conflicts(qlr.mode, strongestMode, &st.SV) {
break
}
removed := false
if qlr.active {
// A queued locking request, that's actively waiting, no longer conflicts
// with locks on this key -- it can be allowed to proceed. There's two
// cases:
// 1. If it's a transactional request, it needs to acquire a claim by
// holding its place in the lock wait queue while marking itself as
// inactive.
// 2. Non-transactional requests do not acquire claims, so they can be
// removed from the wait queue.
if qlr.guard.txn == nil {
kl.removeLockingRequest(curr)
removed = true
} else {
qlr.active = false // mark as inactive
if qlr.guard == kl.distinguishedWaiter {
// A new distinguished waiter will be selected by informActiveWaiters.
kl.distinguishedWaiter = nil
}
qlr.guard.mu.Lock()
qlr.guard.doneActivelyWaitingAtLock()
qlr.guard.mu.Unlock()
}
}
// Locking requests conflict with both the lock holder(s) and other lower
// sequence numbered locking requests in the lock's wait queue, so we may
// need to update strongestMode before moving on with our iteration.
if !removed && strongestMode.Weaker(qlr.mode) {
strongestMode = qlr.mode
}
}
kl.informActiveWaiters()
}

// removeLockingRequest removes the locking request (or non-transactional
Expand Down Expand Up @@ -3609,6 +3681,10 @@ func (kl *keyLocks) releaseWaitersOnKeyUnlocked() (gc bool) {
// REQUIRES: kl.mu is locked.
// REQUIRES: the (receiver) lock must not be held.
// REQUIRES: there should not be any waitingReaders in the lock's wait queues.
//
// TODO(arul): There's a lot of overlap between this method and
// recomputeWaitQueues. We should simplify things by trying to replace all
// usages of this method with recomputeWaitQueues.
func (kl *keyLocks) maybeReleaseCompatibleLockingRequests() {
if kl.isLocked() {
panic("maybeReleaseCompatibleLockingRequests called when lock is held")
Expand Down Expand Up @@ -4158,7 +4234,7 @@ func (t *lockTableImpl) updateLockInternal(up *roachpb.LockUpdate) (heldByTxn bo
var locksToGC []*keyLocks
heldByTxn = false
changeFunc := func(l *keyLocks) {
held, gc := l.tryUpdateLock(up)
held, gc := l.tryUpdateLock(up, t.settings)
heldByTxn = heldByTxn || held
if gc {
locksToGC = append(locksToGC, l)
Expand Down
Loading

0 comments on commit 7745487

Please sign in to comment.