Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency: elide signaling new state channel on some no-op updates #104537

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 54 additions & 18 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,33 @@ func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() {
g.mu.state = waitingState{kind: doneWaiting}
}

// maybeUpdateWaitingStateLocked updates the request's waiting state if the
// supplied state is meaningfully different[1]. The request's state change
// channel is signaled if the waiting state is updated and the caller has
// dictated such. Eliding updates, and more importantly notifications to the
// state change channel, avoids needlessly nudging a waiting request.
//
// [1] The state is not updated if the lock table waiter does not need to take
// action as a result of the update. In practice, this means updates to
// observability related fields are elided. See updateWaitingStateLocked if this
// behavior is undesirable.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) maybeUpdateWaitingStateLocked(newState waitingState, notify bool) {
if g.canElideWaitingStateUpdate(newState) {
return // the update isn't meaningful; early return
}
g.updateWaitingStateLocked(newState)
if notify {
g.notify()
}
}

// updateWaitingStateLocked updates the request's waiting state to indicate
// to the one supplied. The supplied waiting state must imply the request is
// still waiting. Typically, this function is called for the first time when
// the request discovers a conflict while scanning the lock table.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) {
if newState.kind == doneWaiting {
Expand All @@ -567,6 +590,19 @@ func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) {
g.mu.state = newState
}

// canElideWaitingStateUpdate returns true if updating the guard's waiting state
// to the supplied waitingState would not cause the waiter to take a different
// action, such as proceeding with its scan or pushing a different transaction.
// Notably, observability related updates are considered fair game for elision.
//
// REQUIRES: g.mu to be locked.
func (g *lockTableGuardImpl) canElideWaitingStateUpdate(newState waitingState) bool {
// Note that we don't need to check newState.guardStrength as it's
// automatically assigned when updating the state.
return g.mu.state.kind == newState.kind && g.mu.state.txn == newState.txn &&
g.mu.state.key.Equal(newState.key) && g.mu.state.held == newState.held
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(
lockSpanSet *lockspanset.LockSpanSet,
) (ok bool) {
Expand Down Expand Up @@ -1249,23 +1285,25 @@ func (l *lockState) informActiveWaiters() {
}

for e := l.waitingReaders.Front(); e != nil; e = e.Next() {
state := waitForState
// Since there are waiting readers, we could not have transitioned out of
// or into a state where the lock is held. This is because readers only wait
// for held locks -- they race with other {,non-}transactional writers.
if !waitForState.held {
panic("waiting readers should be empty if lock isn't held")
}
assert(state.held, "waiting readers should be empty if the lock isn't held")
g := e.Value.(*lockTableGuardImpl)
if findDistinguished {
l.distinguishedWaiter = g
findDistinguished = false
}
g.mu.Lock()
g.updateWaitingStateLocked(waitForState)
if l.distinguishedWaiter == g {
g.mu.state.kind = waitForDistinguished
state.kind = waitForDistinguished
}
g.notify()
g.mu.Lock()
// NB: The waiter is actively waiting on this lock, so it's likely taking
// some action based on the previous state (e.g. it may be pushing someone).
// If the state has indeed changed, it must perform a different action -- so
// we pass notify = true here to nudge it to do so.
g.maybeUpdateWaitingStateLocked(state, true /* notify */)
g.mu.Unlock()
}
for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
Expand All @@ -1290,8 +1328,11 @@ func (l *lockState) informActiveWaiters() {
}
}
g.mu.Lock()
g.updateWaitingStateLocked(state)
g.notify()
// NB: The waiter is actively waiting on this lock, so it's likely taking
// some action based on the previous state (e.g. it may be pushing someone).
// If the state has indeed changed, it must perform a different action -- so
// we pass notify = true here to nudge it to do so.
g.maybeUpdateWaitingStateLocked(state, true /* notify */)
g.mu.Unlock()
}
}
Expand Down Expand Up @@ -1740,10 +1781,7 @@ func (l *lockState) tryActiveWait(
g.mu.startWait = true
state := waitForState
state.kind = waitQueueMaxLengthExceeded
g.updateWaitingStateLocked(state)
if notify {
g.notify()
}
g.maybeUpdateWaitingStateLocked(state, notify)
// NOTE: we return wait=true not because the request is waiting, but
// because it should not continue scanning for conflicting locks.
return true, false
Expand Down Expand Up @@ -1795,17 +1833,14 @@ func (l *lockState) tryActiveWait(
if g.isSameTxnAsReservation(waitForState) {
state := waitForState
state.kind = waitSelf
g.updateWaitingStateLocked(state)
g.maybeUpdateWaitingStateLocked(state, notify)
} else {
state := waitForState
if l.distinguishedWaiter == nil {
l.distinguishedWaiter = g
state.kind = waitForDistinguished
}
g.updateWaitingStateLocked(state)
}
if notify {
g.notify()
g.maybeUpdateWaitingStateLocked(state, notify)
}
return true, false
}
Expand Down Expand Up @@ -2623,6 +2658,7 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
g.index = -1
g.mu.Lock()
g.mu.startWait = false
g.mu.state = waitingState{}
g.mu.mustFindNextLockAfter = false
g.mu.Unlock()
g.toResolve = g.toResolve[:0]
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package concurrency
import (
"context"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -1815,3 +1816,37 @@ func TestLockStateSafeFormat(t *testing.T) {
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8, ts: 0.000000123,7, info: repl epoch: 0, seqs: [1]\n",
redact.Sprint(l).Redact())
}

// TestElideWaitingStateUpdatesConsidersAllFields ensures all fields in the
// waitingState struct have been considered for inclusion/non-inclusion in the
// logic of canElideWaitingStateUpdate. The test doesn't check if the
// inclusion/non-inclusion claims are actually reflected in the logic; however,
// it does serve as a glorified nudge to consider new fields added to
// waitingState for inclusion/non-inclusion in canElideWaitingStateUpdate's
// logic.
func TestCanElideWaitingStateUpdateConsidersAllFields(t *testing.T) {
type inclusionStatus bool
const (
includeWhenDeciding inclusionStatus = true
doNotIncludeWhenDeciding inclusionStatus = false
)
fieldMap := map[string]inclusionStatus{
"kind": includeWhenDeciding,
"txn": includeWhenDeciding,
"key": includeWhenDeciding,
"held": includeWhenDeciding,
"queuedWriters": doNotIncludeWhenDeciding,
"queuedReaders": doNotIncludeWhenDeciding,
"guardStrength": doNotIncludeWhenDeciding,
}
ws := waitingState{}
typ := reflect.ValueOf(ws).Type()
for i := 0; i < typ.NumField(); i++ {
fieldName := typ.Field(i).Name
_, ok := fieldMap[fieldName]
if !ok {
t.Fatalf("%s field not considered", fieldName)
}
typ.Field(i)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ num=1

guard-state r=req7
----
new: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None
old: state=waitForDistinguished txn=txn1 key="a" held=true guard-strength=None

# ------------------------------------------------------------------------------
# Try to re-acquire an unreplicated lock that is already held at a higher epoch.
Expand Down