Skip to content

Commit

Permalink
replica_rac2: rm unused waitingForAdmissionState
Browse files Browse the repository at this point in the history
Also copy the comment explaining why we chose to track individual log
indices rather then compressing them into a single "waiting" interval.

Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 19, 2024
1 parent b7ce25b commit 10e0be2
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 320 deletions.
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ func (av AdmittedVector) SafeFormat(w redact.SafePrinter, _ rune) {
// write/sync means that all writes made under lower terms, or same term and
// lower log indices, have been completed. A similar guarantee comes for
// admissions at each priority.
//
// For admissions, we've chosen to track individual indices instead of the
// latest index, to allow the system to self correct under inconsistencies
// between the sender (leader) and receiver (replica). For instance, consider
// the case that the sender was tracking indices (3, 5, 8) under a certain
// priority, while the receiver only tracked (3, 8). When 3 is admitted at the
// receiver, the admitted index can advance to 7, allowing 5 to be considered
// admitted at the leader. In comparison, if we only tracked the latest index 8,
// then when 3 was admitted, we could only advance the admitted index to 3.
//
// A secondary reason to track individual indices is that it naturally allows
// the admitted index to advance to stable index without lag in the case where
// there is continuous traffic, but sparseness of indices for a priority. For
// example, if we have indices (10, 20, 30, 40, ...) coming in at high priority,
// and entries are getting admitted with some lag, then when stable=25 and entry
// 20 is admitted, we can advance the admitted index to 25, and not wait for 30
// to be admitted too.
//
// We can revisit the decision to track individual indices if we find the memory
// or compute overhead to be significant.
type LogTracker struct {
// last is the latest log mark observed by the tracker.
last LogMark
Expand Down
119 changes: 1 addition & 118 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@

package replica_rac2

import (
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/errors"
)
import "github.com/cockroachdb/cockroach/pkg/raft/raftpb"

// lowPriOverrideState records which raft log entries have their priority
// overridden to be raftpb.LowPri. Used at follower replicas.
Expand Down Expand Up @@ -192,116 +188,3 @@ func (p *lowPriOverrideState) getEffectivePriority(
}
return pri
}

// waitingForAdmissionState records the indices of individual entries that are
// waiting for admission in the AC queues. These are added after emerging as
// MsgStorageAppend in handleRaftReady, hence we can expect monotonicity of
// indices within a leader term, and if the term changes, we can assume that
// non-monotonicity means raft log indices being overwritten and the log being
// truncated. These assumptions simplify the processing of add, since it can
// wipe out higher indices.
//
// We've chosen to track individual indices instead of the latest index, to
// allow the system to self correct under inconsistencies between the sender
// (leader) and receiver (replica). For instance, consider the case that the
// sender was tracking indices (3, 5, 8) under a certain priority, while the
// receiver only tracked (3, 8) (we've discussed an inaccuracy case in
// lowPriOverrideState, and we also want to be defensive in having some
// self-correcting ability when it comes to distributed protocols). When 3 is
// admitted at the receiver, the admitted index can advance to 7, allowing 5
// to be considered admitted at the leader. In comparison, if we only tracked
// the latest index (8), then when 3 was admitted, we could only advance the
// admitted index to 3. A secondary reason to track individual indices is that
// it naturally allows the admitted index to advance up to match without lag
// in the case where there is continuous traffic, but sparseness of indices
// for a priority. For example, if we have indices (10, 20, 30, 40, ...)
// coming in at high priority, and entries are getting admitted with some lag,
// then when match=25 and entry 20 is admitted, we can advance the admitted
// index to 25, and not have to wait for 30 to be admitted too.
//
// We can revisit the decision to track individual indices if we find the
// memory or compute overhead to be significant.
type waitingForAdmissionState struct {
// The indices for each priority are in increasing index order.
//
// Say the indices for a priority are 3, 6, 10. We track the individual
// indices since when 3 is popped, we can advance admitted for that priority
// to 5. When 6 is popped, admitted can advance to 9. We should never have a
// situation where indices are popped out of order, but we tolerate that by
// popping the prefix upto the index being popped.
waiting [raftpb.NumPriorities][]admissionEntry
}

type admissionEntry struct {
index uint64
leaderTerm uint64
}

func (w *waitingForAdmissionState) add(leaderTerm uint64, index uint64, pri raftpb.Priority) {
n := len(w.waiting[pri])
i := n
// Linear scan, and all the scanned items will be removed.
for ; i > 0; i-- {
if w.waiting[pri][i-1].index < index {
break
}
if buildutil.CrdbTestBuild {
if overwrittenTerm := w.waiting[pri][i-1].leaderTerm; overwrittenTerm >= leaderTerm {
panic(errors.AssertionFailedf("overwritten entry has leaderTerm %d >= %d",
overwrittenTerm, leaderTerm))
}
}
}
// Uncommon case: i < n.
if i < n {
w.waiting[pri] = w.waiting[pri][:i]
n = i
}
if buildutil.CrdbTestBuild {
if n > 0 && w.waiting[pri][n-1].leaderTerm > leaderTerm {
panic(errors.AssertionFailedf("non-monotonic leader terms %d >= %d",
w.waiting[pri][n-1].leaderTerm, leaderTerm))
}
}
w.waiting[pri] = append(w.waiting[pri], admissionEntry{
index: index,
leaderTerm: leaderTerm,
})
}

func (w *waitingForAdmissionState) remove(
leaderTerm uint64, index uint64, pri raftpb.Priority,
) (admittedMayAdvance bool) {
// We expect to typically find the exact entry, except if it was removed
// because of the term advancing and the entry being overwritten in the
// log. Due to the monotonicity assumption in add, we simply do a linear
// scan and remove a prefix.
//
// pos is the last entry that is removed.
pos := -1
n := len(w.waiting[pri])
for ; pos+1 < n; pos++ {
w := w.waiting[pri][pos+1]
if w.leaderTerm > leaderTerm || w.index > index {
break
}
}
w.waiting[pri] = w.waiting[pri][pos+1:]
return pos >= 0
}

func (w *waitingForAdmissionState) computeAdmitted(
stableIndex uint64,
) [raftpb.NumPriorities]uint64 {
var admitted [raftpb.NumPriorities]uint64
for i := range w.waiting {
admitted[i] = stableIndex
if len(w.waiting[i]) > 0 {
upperBoundAdmitted := w.waiting[i][0].index - 1
if upperBoundAdmitted < admitted[i] {
admitted[i] = upperBoundAdmitted
}
}
}
return admitted
}
59 changes: 0 additions & 59 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,62 +103,3 @@ func readPriority(t *testing.T, d *datadriven.TestData) raftpb.Priority {
}
return 0
}

func TestWaitingForAdmissionState(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var w waitingForAdmissionState
waitingStateString := func() string {
var b strings.Builder
for i := range w.waiting {
fmt.Fprintf(&b, "%s:", raftpb.Priority(i))
for _, entry := range w.waiting[i] {
fmt.Fprintf(&b, " (i: %d, term: %d)", entry.index, entry.leaderTerm)
}
fmt.Fprintf(&b, "\n")
}
return b.String()
}
argsLeaderIndexPri := func(
t *testing.T, d *datadriven.TestData) (leaderTerm uint64, index uint64, pri raftpb.Priority) {
d.ScanArgs(t, "leader-term", &leaderTerm)
d.ScanArgs(t, "index", &index)
pri = readPriority(t, d)
return
}
datadriven.RunTest(t, datapathutils.TestDataPath(t, "waiting_for_admission_state"),
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "add":
// Example:
// add leader-term=3 index=5 pri=LowPri
// Adds for tracking index 5, with the given priority,
// received at the specified leader-term.
leaderTerm, index, pri := argsLeaderIndexPri(t, d)
w.add(leaderTerm, index, pri)
return waitingStateString()

case "remove":
// Example:
// remove leader-term=3 index=5 pri=LowPri
// Removes an entry after admission.
leaderTerm, index, pri := argsLeaderIndexPri(t, d)
advanced := w.remove(leaderTerm, index, pri)
return fmt.Sprintf("admittedAdvanced: %t\n%s", advanced, waitingStateString())

case "compute-admitted":
// Example:
// compute-admitted stable-index=7
// Computes the admitted array.
var stableIndex uint64
d.ScanArgs(t, "stable-index", &stableIndex)
admitted := w.computeAdmitted(stableIndex)
return fmt.Sprintf("admitted: [%d, %d, %d, %d]\n",
admitted[0], admitted[1], admitted[2], admitted[3])

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

This file was deleted.

0 comments on commit 10e0be2

Please sign in to comment.