diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index e00609e3e32f..f4ea5b85d06a 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -554,10 +554,33 @@ func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() { g.mu.state = waitingState{kind: doneWaiting} } +// maybeUpdateWaitingStateLocked updates the request's waiting state if the +// supplied state is meaningfully different[1]. The request's state change +// channel is signaled if the waiting state is updated and the caller has +// dictated such. Eliding updates, and more importantly notifications to the +// state change channel, avoids needlessly nudging a waiting request. +// +// [1] The state is not updated if the lock table waiter does not need to take +// action as a result of the update. In practice, this means updates to +// observability related fields are elided. See updateWaitingStateLocked if this +// behavior is undesirable. +// +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) maybeUpdateWaitingStateLocked(newState waitingState, notify bool) { + if g.canElideWaitingStateUpdate(newState) { + return // the update isn't meaningful; early return + } + g.updateWaitingStateLocked(newState) + if notify { + g.notify() + } +} + // updateWaitingStateLocked updates the request's waiting state to indicate // to the one supplied. The supplied waiting state must imply the request is // still waiting. Typically, this function is called for the first time when // the request discovers a conflict while scanning the lock table. +// // REQUIRES: g.mu to be locked. func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { if newState.kind == doneWaiting { @@ -567,6 +590,19 @@ func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { g.mu.state = newState } +// canElideWaitingStateUpdate returns true if updating the guard's waiting state +// to the supplied waitingState would not cause the waiter to take a different +// action, such as proceeding with its scan or pushing a different transaction. +// Notably, observability related updates are considered fair game for elision. +// +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) canElideWaitingStateUpdate(newState waitingState) bool { + // Note that we don't need to check newState.guardStrength as it's + // automatically assigned when updating the state. + return g.mu.state.kind == newState.kind && g.mu.state.txn == newState.txn && + g.mu.state.key.Equal(newState.key) && g.mu.state.held == newState.held +} + func (g *lockTableGuardImpl) CheckOptimisticNoConflicts( lockSpanSet *lockspanset.LockSpanSet, ) (ok bool) { @@ -1249,23 +1285,25 @@ func (l *lockState) informActiveWaiters() { } for e := l.waitingReaders.Front(); e != nil; e = e.Next() { + state := waitForState // Since there are waiting readers, we could not have transitioned out of // or into a state where the lock is held. This is because readers only wait // for held locks -- they race with other {,non-}transactional writers. - if !waitForState.held { - panic("waiting readers should be empty if lock isn't held") - } + assert(state.held, "waiting readers should be empty if the lock isn't held") g := e.Value.(*lockTableGuardImpl) if findDistinguished { l.distinguishedWaiter = g findDistinguished = false } - g.mu.Lock() - g.updateWaitingStateLocked(waitForState) if l.distinguishedWaiter == g { - g.mu.state.kind = waitForDistinguished + state.kind = waitForDistinguished } - g.notify() + g.mu.Lock() + // NB: The waiter is actively waiting on this lock, so it's likely taking + // some action based on the previous state (e.g. it may be pushing someone). + // If the state has indeed changed, it must perform a different action -- so + // we pass notify = true here to nudge it to do so. + g.maybeUpdateWaitingStateLocked(state, true /* notify */) g.mu.Unlock() } for e := l.queuedWriters.Front(); e != nil; e = e.Next() { @@ -1290,8 +1328,11 @@ func (l *lockState) informActiveWaiters() { } } g.mu.Lock() - g.updateWaitingStateLocked(state) - g.notify() + // NB: The waiter is actively waiting on this lock, so it's likely taking + // some action based on the previous state (e.g. it may be pushing someone). + // If the state has indeed changed, it must perform a different action -- so + // we pass notify = true here to nudge it to do so. + g.maybeUpdateWaitingStateLocked(state, true /* notify */) g.mu.Unlock() } } @@ -1740,10 +1781,7 @@ func (l *lockState) tryActiveWait( g.mu.startWait = true state := waitForState state.kind = waitQueueMaxLengthExceeded - g.updateWaitingStateLocked(state) - if notify { - g.notify() - } + g.maybeUpdateWaitingStateLocked(state, notify) // NOTE: we return wait=true not because the request is waiting, but // because it should not continue scanning for conflicting locks. return true, false @@ -1795,17 +1833,14 @@ func (l *lockState) tryActiveWait( if g.isSameTxnAsReservation(waitForState) { state := waitForState state.kind = waitSelf - g.updateWaitingStateLocked(state) + g.maybeUpdateWaitingStateLocked(state, notify) } else { state := waitForState if l.distinguishedWaiter == nil { l.distinguishedWaiter = g state.kind = waitForDistinguished } - g.updateWaitingStateLocked(state) - } - if notify { - g.notify() + g.maybeUpdateWaitingStateLocked(state, notify) } return true, false } @@ -2623,6 +2658,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa g.index = -1 g.mu.Lock() g.mu.startWait = false + g.mu.state = waitingState{} g.mu.mustFindNextLockAfter = false g.mu.Unlock() g.toResolve = g.toResolve[:0] diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 30b69fff8557..12bd2b12fe27 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -13,6 +13,7 @@ package concurrency import ( "context" "fmt" + "reflect" "runtime" "strconv" "strings" @@ -1815,3 +1816,37 @@ func TestLockStateSafeFormat(t *testing.T) { " lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8, ts: 0.000000123,7, info: repl epoch: 0, seqs: [1]\n", redact.Sprint(l).Redact()) } + +// TestElideWaitingStateUpdatesConsidersAllFields ensures all fields in the +// waitingState struct have been considered for inclusion/non-inclusion in the +// logic of canElideWaitingStateUpdate. The test doesn't check if the +// inclusion/non-inclusion claims are actually reflected in the logic; however, +// it does serve as a glorified nudge to consider new fields added to +// waitingState for inclusion/non-inclusion in canElideWaitingStateUpdate's +// logic. +func TestCanElideWaitingStateUpdateConsidersAllFields(t *testing.T) { + type inclusionStatus bool + const ( + includeWhenDeciding inclusionStatus = true + doNotIncludeWhenDeciding inclusionStatus = false + ) + fieldMap := map[string]inclusionStatus{ + "kind": includeWhenDeciding, + "txn": includeWhenDeciding, + "key": includeWhenDeciding, + "held": includeWhenDeciding, + "queuedWriters": doNotIncludeWhenDeciding, + "queuedReaders": doNotIncludeWhenDeciding, + "guardStrength": doNotIncludeWhenDeciding, + } + ws := waitingState{} + typ := reflect.ValueOf(ws).Type() + for i := 0; i < typ.NumField(); i++ { + fieldName := typ.Field(i).Name + _, ok := fieldMap[fieldName] + if !ok { + t.Fatalf("%s field not considered", fieldName) + } + typ.Field(i) + } +} diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes b/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes index c1ef6974864b..80ffc602dcb9 100644 --- a/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes @@ -193,7 +193,7 @@ num=1 guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None +old: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None # ------------------------------------------------------------------------------ # Try to re-acquire an unreplicated lock that is already held at a higher epoch.