Skip to content

Commit

Permalink
concurrency: elide signaling new state channel on some no-op updates
Browse files Browse the repository at this point in the history
Previously, various places in the lock table code
(e.g. `informActiveWaiters`) would update a request's waiting state,
and in a few cases, unconditionally signal the new state channel.
This would happen even if the waiting state for the active waiter
had not changed. This was fine, as the contract for signaling the new
state channel makes no guarantees that there has indeed been a state
change -- just that there could have been one.

This patch tightens the cases where the channel is signaled/waiting
state is updated. We elide updates to the waiting state and signaling
the channel in cases where the waiter does not need to take any
meaningful action. This change will allow us to blindly update waiting
states in the future, by calling `informActiveWaiters`, without needing
to decide whether there is a state change or not at the caller.

As a result of this patch, some observability related fields like the
number of waiting readers/writers may be stale. Note that these were
could always be stale, this patch just increases the cases where they
may be stale.

Epic: none
Release note: None
  • Loading branch information
arulajmani committed Jun 9, 2023
1 parent 05dded4 commit ff94ccd
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 19 deletions.
72 changes: 54 additions & 18 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,33 @@ func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() {
g.mu.state = waitingState{kind: doneWaiting}
}

// maybeUpdateWaitingStateLocked updates the request's waiting state if the
// supplied state is meaningfully different[1]. The request's state change
// channel is signaled if the waiting state is updated and the caller has
// dictated such. Eliding updates, and more importantly notifications to the
// state change channel, avoids needlessly nudging a waiting request.
//
// [1] The state is not updated if the lock table waiter does not need to take
// action as a result of the update. In practice, this means updates to
// observability related fields are elided. See updateWaitingStateLocked if this
// behavior is undesirable.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) maybeUpdateWaitingStateLocked(newState waitingState, notify bool) {
if g.canElideWaitingStateUpdate(newState) {
return // the update isn't meaningful; early return
}
g.updateWaitingStateLocked(newState)
if notify {
g.notify()
}
}

// updateWaitingStateLocked updates the request's waiting state to indicate
// to the one supplied. The supplied waiting state must imply the request is
// still waiting. Typically, this function is called for the first time when
// the request discovers a conflict while scanning the lock table.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) {
if newState.kind == doneWaiting {
Expand All @@ -567,6 +590,19 @@ func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) {
g.mu.state = newState
}

// canElideWaitingStateUpdate returns true if updating the guard's waiting state
// to the supplied waitingState would not cause the waiter to take a different
// action, such as proceeding with its scan or pushing a different transaction.
// Notably, observability related updates are considered fair game for elision.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) canElideWaitingStateUpdate(newState waitingState) bool {
// Note that we don't need to check newState.guardStrength as it's
// automatically assigned when updating the state.
return g.mu.state.kind == newState.kind && g.mu.state.txn == newState.txn &&
g.mu.state.key.Equal(newState.key) && g.mu.state.held == newState.held
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
lockSpanSet *lockspanset.LockSpanSet,
) (ok bool) {
Expand Down Expand Up @@ -1249,23 +1285,25 @@ func (l *lockState) informActiveWaiters() {
}

for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
state := waitForState
// Since there are waiting readers, we could not have transitioned out of
// or into a state where the lock is held. This is because readers only wait
// for held locks -- they race with other {,non-}transactional writers.
if !waitForState.held {
panic("waiting readers should be empty if lock isn't held")
}
assert(state.held, "waiting readers should be empty if the lock isn't held")
g := e.Value.(*lockTableGuardImpl)
if findDistinguished {
l.distinguishedWaiter = g
findDistinguished = false
}
g.mu.Lock()
g.updateWaitingStateLocked(waitForState)
if l.distinguishedWaiter == g {
g.mu.state.kind = waitForDistinguished
state.kind = waitForDistinguished
}
g.notify()
g.mu.Lock()
// NB: The waiter is actively waiting on this lock, so it's likely taking
// some action based on the previous state (e.g. it may be pushing someone).
// If the state has indeed changed, it must perform a different action -- so
// we pass notify = true here to nudge it to do so.
g.maybeUpdateWaitingStateLocked(state, true /* notify */)
g.mu.Unlock()
}
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
Expand All @@ -1290,8 +1328,11 @@ func (l *lockState) informActiveWaiters() {
}
}
g.mu.Lock()
g.updateWaitingStateLocked(state)
g.notify()
// NB: The waiter is actively waiting on this lock, so it's likely taking
// some action based on the previous state (e.g. it may be pushing someone).
// If the state has indeed changed, it must perform a different action -- so
// we pass notify = true here to nudge it to do so.
g.maybeUpdateWaitingStateLocked(state, true /* notify */)
g.mu.Unlock()
}
}
Expand Down Expand Up @@ -1740,10 +1781,7 @@ func (l *lockState) tryActiveWait(
g.mu.startWait = true
state := waitForState
state.kind = waitQueueMaxLengthExceeded
g.updateWaitingStateLocked(state)
if notify {
g.notify()
}
g.maybeUpdateWaitingStateLocked(state, notify)
// NOTE: we return wait=true not because the request is waiting, but
// because it should not continue scanning for conflicting locks.
return true, false
Expand Down Expand Up @@ -1795,17 +1833,14 @@ func (l *lockState) tryActiveWait(
if g.isSameTxnAsReservation(waitForState) {
state := waitForState
state.kind = waitSelf
g.updateWaitingStateLocked(state)
g.maybeUpdateWaitingStateLocked(state, notify)
} else {
state := waitForState
if l.distinguishedWaiter == nil {
l.distinguishedWaiter = g
state.kind = waitForDistinguished
}
g.updateWaitingStateLocked(state)
}
if notify {
g.notify()
g.maybeUpdateWaitingStateLocked(state, notify)
}
return true, false
}
Expand Down Expand Up @@ -2623,6 +2658,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
g.index = -1
g.mu.Lock()
g.mu.startWait = false
g.mu.state = waitingState{}
g.mu.mustFindNextLockAfter = false
g.mu.Unlock()
g.toResolve = g.toResolve[:0]
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package concurrency
import (
"context"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -1815,3 +1816,37 @@ func TestLockStateSafeFormat(t *testing.T) {
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8, ts: 0.000000123,7, info: repl epoch: 0, seqs: [1]\n",
redact.Sprint(l).Redact())
}

// TestElideWaitingStateUpdatesConsidersAllFields ensures all fields in the
// waitingState struct have been considered for inclusion/non-inclusion in the
// logic of canElideWaitingStateUpdate. The test doesn't check if the
// inclusion/non-inclusion claims are actually reflected in the logic; however,
// it does serve as a glorified nudge to consider new fields added to
// waitingState for inclusion/non-inclusion in canElideWaitingStateUpdate's
// logic.
func TestCanElideWaitingStateUpdateConsidersAllFields(t *testing.T) {
type inclusionStatus bool
const (
includeWhenDeciding inclusionStatus = true
doNotIncludeWhenDeciding inclusionStatus = false
)
fieldMap := map[string]inclusionStatus{
"kind": includeWhenDeciding,
"txn": includeWhenDeciding,
"key": includeWhenDeciding,
"held": includeWhenDeciding,
"queuedWriters": doNotIncludeWhenDeciding,
"queuedReaders": doNotIncludeWhenDeciding,
"guardStrength": doNotIncludeWhenDeciding,
}
ws := waitingState{}
typ := reflect.ValueOf(ws).Type()
for i := 0; i < typ.NumField(); i++ {
fieldName := typ.Field(i).Name
_, ok := fieldMap[fieldName]
if !ok {
t.Fatalf("%s field not considered", fieldName)
}
typ.Field(i)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ num=1

guard-state r=req7
----
new: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None
old: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None

# ------------------------------------------------------------------------------
# Try to re-acquire an unreplicated lock that is already held at a higher epoch.
Expand Down

0 comments on commit ff94ccd

Please sign in to comment.