From ff94ccd202b03f2650e42e1a0de8c633df1c68e8 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 2 Jun 2023 15:16:06 -0400 Subject: [PATCH] concurrency: elide signaling new state channel on some no-op updates Previously, various places in the lock table code (e.g. `informActiveWaiters`) would update a request's waiting state, and in a few cases, unconditionally signal the new state channel. This would happen even if the waiting state for the active waiter had not changed. This was fine, as the contract for signaling the new state channel makes no guarantees that there has indeed been a state change -- just that there could have been one. This patch tightens the cases where the channel is signaled/waiting state is updated. We elide updates to the waiting state and signaling the channel in cases where the waiter does not need to take any meaningful action. This change will allow us to blindly update waiting states in the future, by calling `informActiveWaiters`, without needing to decide whether there is a state change or not at the caller. As a result of this patch, some observability related fields like the number of waiting readers/writers may be stale. Note that these were could always be stale, this patch just increases the cases where they may be stale. Epic: none Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 72 ++++++++++++++----- .../kvserver/concurrency/lock_table_test.go | 35 +++++++++ .../testdata/lock_table/lock_changes | 2 +- 3 files changed, 90 insertions(+), 19 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index e00609e3e32f..f4ea5b85d06a 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -554,10 +554,33 @@ func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() { g.mu.state = waitingState{kind: doneWaiting} } +// maybeUpdateWaitingStateLocked updates the request's waiting state if the +// supplied state is meaningfully different[1]. The request's state change +// channel is signaled if the waiting state is updated and the caller has +// dictated such. Eliding updates, and more importantly notifications to the +// state change channel, avoids needlessly nudging a waiting request. +// +// [1] The state is not updated if the lock table waiter does not need to take +// action as a result of the update. In practice, this means updates to +// observability related fields are elided. See updateWaitingStateLocked if this +// behavior is undesirable. +// +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) maybeUpdateWaitingStateLocked(newState waitingState, notify bool) { + if g.canElideWaitingStateUpdate(newState) { + return // the update isn't meaningful; early return + } + g.updateWaitingStateLocked(newState) + if notify { + g.notify() + } +} + // updateWaitingStateLocked updates the request's waiting state to indicate // to the one supplied. The supplied waiting state must imply the request is // still waiting. Typically, this function is called for the first time when // the request discovers a conflict while scanning the lock table. +// // REQUIRES: g.mu to be locked. func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { if newState.kind == doneWaiting { @@ -567,6 +590,19 @@ func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { g.mu.state = newState } +// canElideWaitingStateUpdate returns true if updating the guard's waiting state +// to the supplied waitingState would not cause the waiter to take a different +// action, such as proceeding with its scan or pushing a different transaction. +// Notably, observability related updates are considered fair game for elision. +// +// REQUIRES: g.mu to be locked. +func (g *lockTableGuardImpl) canElideWaitingStateUpdate(newState waitingState) bool { + // Note that we don't need to check newState.guardStrength as it's + // automatically assigned when updating the state. + return g.mu.state.kind == newState.kind && g.mu.state.txn == newState.txn && + g.mu.state.key.Equal(newState.key) && g.mu.state.held == newState.held +} + func (g *lockTableGuardImpl) CheckOptimisticNoConflicts( lockSpanSet *lockspanset.LockSpanSet, ) (ok bool) { @@ -1249,23 +1285,25 @@ func (l *lockState) informActiveWaiters() { } for e := l.waitingReaders.Front(); e != nil; e = e.Next() { + state := waitForState // Since there are waiting readers, we could not have transitioned out of // or into a state where the lock is held. This is because readers only wait // for held locks -- they race with other {,non-}transactional writers. - if !waitForState.held { - panic("waiting readers should be empty if lock isn't held") - } + assert(state.held, "waiting readers should be empty if the lock isn't held") g := e.Value.(*lockTableGuardImpl) if findDistinguished { l.distinguishedWaiter = g findDistinguished = false } - g.mu.Lock() - g.updateWaitingStateLocked(waitForState) if l.distinguishedWaiter == g { - g.mu.state.kind = waitForDistinguished + state.kind = waitForDistinguished } - g.notify() + g.mu.Lock() + // NB: The waiter is actively waiting on this lock, so it's likely taking + // some action based on the previous state (e.g. it may be pushing someone). + // If the state has indeed changed, it must perform a different action -- so + // we pass notify = true here to nudge it to do so. + g.maybeUpdateWaitingStateLocked(state, true /* notify */) g.mu.Unlock() } for e := l.queuedWriters.Front(); e != nil; e = e.Next() { @@ -1290,8 +1328,11 @@ func (l *lockState) informActiveWaiters() { } } g.mu.Lock() - g.updateWaitingStateLocked(state) - g.notify() + // NB: The waiter is actively waiting on this lock, so it's likely taking + // some action based on the previous state (e.g. it may be pushing someone). + // If the state has indeed changed, it must perform a different action -- so + // we pass notify = true here to nudge it to do so. + g.maybeUpdateWaitingStateLocked(state, true /* notify */) g.mu.Unlock() } } @@ -1740,10 +1781,7 @@ func (l *lockState) tryActiveWait( g.mu.startWait = true state := waitForState state.kind = waitQueueMaxLengthExceeded - g.updateWaitingStateLocked(state) - if notify { - g.notify() - } + g.maybeUpdateWaitingStateLocked(state, notify) // NOTE: we return wait=true not because the request is waiting, but // because it should not continue scanning for conflicting locks. return true, false @@ -1795,17 +1833,14 @@ func (l *lockState) tryActiveWait( if g.isSameTxnAsReservation(waitForState) { state := waitForState state.kind = waitSelf - g.updateWaitingStateLocked(state) + g.maybeUpdateWaitingStateLocked(state, notify) } else { state := waitForState if l.distinguishedWaiter == nil { l.distinguishedWaiter = g state.kind = waitForDistinguished } - g.updateWaitingStateLocked(state) - } - if notify { - g.notify() + g.maybeUpdateWaitingStateLocked(state, notify) } return true, false } @@ -2623,6 +2658,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa g.index = -1 g.mu.Lock() g.mu.startWait = false + g.mu.state = waitingState{} g.mu.mustFindNextLockAfter = false g.mu.Unlock() g.toResolve = g.toResolve[:0] diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 30b69fff8557..12bd2b12fe27 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -13,6 +13,7 @@ package concurrency import ( "context" "fmt" + "reflect" "runtime" "strconv" "strings" @@ -1815,3 +1816,37 @@ func TestLockStateSafeFormat(t *testing.T) { " lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8, ts: 0.000000123,7, info: repl epoch: 0, seqs: [1]\n", redact.Sprint(l).Redact()) } + +// TestElideWaitingStateUpdatesConsidersAllFields ensures all fields in the +// waitingState struct have been considered for inclusion/non-inclusion in the +// logic of canElideWaitingStateUpdate. The test doesn't check if the +// inclusion/non-inclusion claims are actually reflected in the logic; however, +// it does serve as a glorified nudge to consider new fields added to +// waitingState for inclusion/non-inclusion in canElideWaitingStateUpdate's +// logic. +func TestCanElideWaitingStateUpdateConsidersAllFields(t *testing.T) { + type inclusionStatus bool + const ( + includeWhenDeciding inclusionStatus = true + doNotIncludeWhenDeciding inclusionStatus = false + ) + fieldMap := map[string]inclusionStatus{ + "kind": includeWhenDeciding, + "txn": includeWhenDeciding, + "key": includeWhenDeciding, + "held": includeWhenDeciding, + "queuedWriters": doNotIncludeWhenDeciding, + "queuedReaders": doNotIncludeWhenDeciding, + "guardStrength": doNotIncludeWhenDeciding, + } + ws := waitingState{} + typ := reflect.ValueOf(ws).Type() + for i := 0; i < typ.NumField(); i++ { + fieldName := typ.Field(i).Name + _, ok := fieldMap[fieldName] + if !ok { + t.Fatalf("%s field not considered", fieldName) + } + typ.Field(i) + } +} diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes b/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes index c1ef6974864b..80ffc602dcb9 100644 --- a/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/lock_changes @@ -193,7 +193,7 @@ num=1 guard-state r=req7 ---- -new: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None +old: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None # ------------------------------------------------------------------------------ # Try to re-acquire an unreplicated lock that is already held at a higher epoch.