Skip to content

Commit

Permalink
concurrency: correctly handle lock updates that decrease its strength
Browse files Browse the repository at this point in the history
When a lock is updated on behalf of a non-finalized transaction, the
lock table simply forgets any replicated lock information it was
previously tracking. Now, if a lock is held with both replicated and
unreplicated durability, and the replicated lock's strength is stronger
than the unreplicated lock's strength, forgetting the replicated lock
means there may be requests waiting on a key that no longer conflict
with what's being tracked in memory[1]. In such cases, requests that no
longer conflict with the in-memory tracking should no longer wait on
this key -- they should be allowed to proceed with their scan. It's
not guaranteed that they no longer conflict with the lock -- they might
rediscover the replicated lock again and start waiting. However, if they
no longer conflict with the lock, blocking them indefinitely can lead to
undetected deadlocks in rare cases[2].

[1] Concretely, consider the following construction:
- key a: [txn 1, (repl intent ts@10), (unrepl shared)]
  - wait-queue readers: [none ts@12], [none ts@15]
  - wait-queue locking requests: [shared], [shared], [exclusive] [shared]
In this case, updating the intent's timestamp to ts@14 (and forgetting
it) should allow both the non-locking readers and first two shared
locking requests to become compatible with what's being tracked in
memory for this key.
[2] This can happen if the lock update corresponds to a successful
PUSH_TIMESTAMP. The pusher will be blocked in the lock table waiter
waiting for a `waitingState` update that never comes. Now, if there
was a dependency cycle between the pusher and pushee had the push
timestamp been blocking, we would find ourselves in an undetected
deeadlock situation.

Fixes cockroachdb#112608

Release note: None
  • Loading branch information
arulajmani committed Nov 1, 2023
1 parent 974b43d commit 1d21c04
Show file tree
Hide file tree
Showing 6 changed files with 540 additions and 12 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/concurrency/lock/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ func (m *Mode) Empty() bool {
return m.Strength == None && m.Timestamp.IsEmpty()
}

// Less returns true if the receiver conflicts with fewer requests than the Mode
// supplied.
func (m Mode) Less(o Mode) bool {
if m.Strength == o.Strength {
return !m.Timestamp.Less(o.Timestamp) // lower timestamp conflicts with more requests
}
return m.Strength < o.Strength
}

// MakeModeNone constructs a Mode with strength None.
func MakeModeNone(ts hlc.Timestamp, isoLevel isolation.Level) Mode {
return Mode{
Expand Down
65 changes: 65 additions & 0 deletions pkg/kv/kvserver/concurrency/lock/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,68 @@ func TestCheckLockConflicts_IntentWithIntent(t *testing.T) {
)
}
}

func TestLockModeLess(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tsBelow := makeTS(1)
tsLock := makeTS(2)
tsAbove := makeTS(3)

testCases := []struct {
m1 lock.Mode
m2 lock.Mode
exp bool
}{
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable),
m2: lock.MakeModeNone(tsBelow, isolation.Serializable), // stronger
exp: true,
},
{
m1: lock.MakeModeNone(tsLock, isolation.Serializable), // stronger
m2: lock.MakeModeNone(tsAbove, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeNone(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock),
m2: lock.MakeModeIntent(tsBelow), // stronger
exp: true,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeIntent(tsAbove),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeShared(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeUpdate(),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsBelow, isolation.Serializable),
exp: false,
},
{
m1: lock.MakeModeIntent(tsLock), // stronger
m2: lock.MakeModeExclusive(tsAbove, isolation.Serializable),
exp: false,
},
}

for _, tc := range testCases {
require.Equal(t, tc.exp, tc.m1.Less(tc.m2))
}
}
106 changes: 94 additions & 12 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,10 @@ func (kl *keyLocks) acquireLock(
// here.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
// TODO(XXX): we can replace this with recomputeWaitQueues and get rid
// of increasedLockTS entirely. We also don't need to to be strict about
// only calling recomputeWaitQueues if the lock is held with Exclusive
// or Intent lock strength, but it doesn't hurt.
kl.increasedLockTs(afterTs)
}
}
Expand Down Expand Up @@ -3200,14 +3204,18 @@ func (kl *keyLocks) tryClearLock(force bool) bool {
// transaction, else the lock is updated. Returns whether the keyLocks struct
// can be garbage collected, and whether it was held by the txn.
// Acquires l.mu.
func (kl *keyLocks) tryUpdateLock(up *roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLock(
up *roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
kl.mu.Lock()
defer kl.mu.Unlock()
return kl.tryUpdateLockLocked(*up)
return kl.tryUpdateLockLocked(*up, st)
}

// REQUIRES: kl.mu is locked.
func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bool) {
func (kl *keyLocks) tryUpdateLockLocked(
up roachpb.LockUpdate, st *cluster.Settings,
) (heldByTxn, gc bool) {
if kl.isEmptyLock() {
// Already free. This can happen when an unreplicated lock is removed in
// tryActiveWait due to the txn being in the txnStatusCache.
Expand Down Expand Up @@ -3242,6 +3250,7 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
txn := &up.Txn
ts := up.Txn.WriteTimestamp
beforeTs := tl.writeTS()
beforeStr := tl.getLockMode().Strength
advancedTs := beforeTs.Less(ts)
isLocked := false
// The MVCC keyspace is the source of truth about the disposition of a
Expand Down Expand Up @@ -3321,14 +3330,10 @@ func (kl *keyLocks) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
return true, gc
}

if advancedTs {
// We only need to let through non-locking readers if the lock is held with
// strength {Exclusive,Intent}. See acquireLock for an explanation as to
// why.
if tl.getLockMode().Strength == lock.Exclusive ||
tl.getLockMode().Strength == lock.Intent {
kl.increasedLockTs(ts)
}
afterStr := tl.getLockMode().Strength

if beforeStr != afterStr || advancedTs {
kl.recomputeWaitQueues(st)
}
// Else no change for waiters. This can happen due to a race between different
// callers of UpdateLocks().
Expand All @@ -3347,6 +3352,9 @@ func (kl *keyLocks) increasedLockTs(newTs hlc.Timestamp) {
curr := e
e = e.Next()
if g.ts.Less(newTs) {
// TODO(arul): Flip this condition; as written, this short circuits --
// once we've removed the distinguished waiter, we'll stop releasing
// readers that may no longer conflict with the lock.
distinguishedRemoved = distinguishedRemoved || kl.removeReader(curr)
}
// Else don't inform an active waiter which continues to be an active waiter
Expand All @@ -3357,6 +3365,80 @@ func (kl *keyLocks) increasedLockTs(newTs hlc.Timestamp) {
}
}

// recomputeWaitQueues goes through the receiver's wait queues and recomputes
// whether actively waiting requests should continue to do so, given the key's
// locks holders and other waiting requests. Such computation is necessary when
// a lock's strength has decreased[1] or locking requests have dropped out of
// wait queue's[2] without actually acquiring the lock.
//
// [1] This can happen as a result of savepoint rollback or when the lock table
// stops tracking a replicated lock because of a PUSH_TIMESTAMP that
// successfully bumps the pushee's timestamp.
// [2] A locking request that doesn't conflict with any held lock(s) may still
// have to actively wait if it conflicts with a lower sequence numbered request
// already in the lock's wait queue. Locking requests dropping out of a lock's
// wait queue can therefore result in other requests no longer needing to
// actively wait.
//
// TODO(arul): We could optimize this function if we had information about the
// context it was being called in.
//
// REQUIRES: kl.mu to be locked.
func (kl *keyLocks) recomputeWaitQueues(st *cluster.Settings) {
var strongestMode lock.Mode
for e := kl.holders.Front(); e != nil; e = e.Next() {
holder := e.Value
if strongestMode.Less(holder.getLockMode()) {
strongestMode = holder.getLockMode()
}
}

for e := kl.waitingReaders.Front(); e != nil; {
reader := e.Value
curr := e
e = e.Next()
if !lock.Conflicts(reader.curLockMode(), strongestMode, &st.SV) {
kl.removeReader(curr)
}
}
for e := kl.queuedLockingRequests.Front(); e != nil; {
qlr := e.Value
curr := e
e = e.Next()
if !strongestMode.Empty() && lock.Conflicts(qlr.mode, strongestMode, &st.SV) {
break
}
if qlr.active {
// A queued locking request, that's actively waiting, no longer conflicts
// with locks on this key -- it can be allowed to proceed. There's two
// cases:
// 1. If it's a transactional request, it needs to acquire a claim by
// holding its place in the lock wait queue while marking itself as
// inactive.
// 2. Non-transactional requests do not acquire claims, so they can be
// removed from the wait queue.
if qlr.guard.txn == nil {
kl.removeLockingRequest(curr)
} else {
qlr.active = false // mark as inactive
if qlr.guard == kl.distinguishedWaiter {
kl.distinguishedWaiter = nil
}
qlr.guard.mu.Lock()
qlr.guard.doneActivelyWaitingAtLock()
qlr.guard.mu.Unlock()
}
}
// NB: Non-transactional locking requests race with transactional locking
// requests, so we don't want to consider them when deciding whether to
// update strongestMode.
if qlr.guard.txn != nil && strongestMode.Less(qlr.mode) {
strongestMode = qlr.mode
}
}
kl.informActiveWaiters()
}

// removeLockingRequest removes the locking request (or non-transactional
// writer), referenced by the supplied list.Element, from the lock's
// queuedLockingRequests list. Returns whether the request was the distinguished
Expand Down Expand Up @@ -4155,7 +4237,7 @@ func (t *lockTableImpl) updateLockInternal(up *roachpb.LockUpdate) (heldByTxn bo
var locksToGC []*keyLocks
heldByTxn = false
changeFunc := func(l *keyLocks) {
held, gc := l.tryUpdateLock(up)
held, gc := l.tryUpdateLock(up, t.settings)
heldByTxn = heldByTxn || held
if gc {
locksToGC = append(locksToGC, l)
Expand Down
Loading

0 comments on commit 1d21c04

Please sign in to comment.