diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index 21935a88018a..bd51c4ba70ef 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -80,6 +80,26 @@ func (av AdmittedVector) SafeFormat(w redact.SafePrinter, _ rune) { // write/sync means that all writes made under lower terms, or same term and // lower log indices, have been completed. A similar guarantee comes for // admissions at each priority. +// +// For admissions, we've chosen to track individual indices instead of the +// latest index, to allow the system to self correct under inconsistencies +// between the sender (leader) and receiver (replica). For instance, consider +// the case that the sender was tracking indices (3, 5, 8) under a certain +// priority, while the receiver only tracked (3, 8). When 3 is admitted at the +// receiver, the admitted index can advance to 7, allowing 5 to be considered +// admitted at the leader. In comparison, if we only tracked the latest index 8, +// then when 3 was admitted, we could only advance the admitted index to 3. +// +// A secondary reason to track individual indices is that it naturally allows +// the admitted index to advance to stable index without lag in the case where +// there is continuous traffic, but sparseness of indices for a priority. For +// example, if we have indices (10, 20, 30, 40, ...) coming in at high priority, +// and entries are getting admitted with some lag, then when stable=25 and entry +// 20 is admitted, we can advance the admitted index to 25, and not wait for 30 +// to be admitted too. +// +// We can revisit the decision to track individual indices if we find the memory +// or compute overhead to be significant. type LogTracker struct { // last is the latest log mark observed by the tracker. last LogMark diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go index 865484ce0e99..476664888c0f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go @@ -10,11 +10,7 @@ package replica_rac2 -import ( - "github.com/cockroachdb/cockroach/pkg/raft/raftpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/cockroachdb/errors" -) +import "github.com/cockroachdb/cockroach/pkg/raft/raftpb" // lowPriOverrideState records which raft log entries have their priority // overridden to be raftpb.LowPri. Used at follower replicas. @@ -192,116 +188,3 @@ func (p *lowPriOverrideState) getEffectivePriority( } return pri } - -// waitingForAdmissionState records the indices of individual entries that are -// waiting for admission in the AC queues. These are added after emerging as -// MsgStorageAppend in handleRaftReady, hence we can expect monotonicity of -// indices within a leader term, and if the term changes, we can assume that -// non-monotonicity means raft log indices being overwritten and the log being -// truncated. These assumptions simplify the processing of add, since it can -// wipe out higher indices. -// -// We've chosen to track individual indices instead of the latest index, to -// allow the system to self correct under inconsistencies between the sender -// (leader) and receiver (replica). For instance, consider the case that the -// sender was tracking indices (3, 5, 8) under a certain priority, while the -// receiver only tracked (3, 8) (we've discussed an inaccuracy case in -// lowPriOverrideState, and we also want to be defensive in having some -// self-correcting ability when it comes to distributed protocols). When 3 is -// admitted at the receiver, the admitted index can advance to 7, allowing 5 -// to be considered admitted at the leader. In comparison, if we only tracked -// the latest index (8), then when 3 was admitted, we could only advance the -// admitted index to 3. A secondary reason to track individual indices is that -// it naturally allows the admitted index to advance up to match without lag -// in the case where there is continuous traffic, but sparseness of indices -// for a priority. For example, if we have indices (10, 20, 30, 40, ...) -// coming in at high priority, and entries are getting admitted with some lag, -// then when match=25 and entry 20 is admitted, we can advance the admitted -// index to 25, and not have to wait for 30 to be admitted too. -// -// We can revisit the decision to track individual indices if we find the -// memory or compute overhead to be significant. -type waitingForAdmissionState struct { - // The indices for each priority are in increasing index order. - // - // Say the indices for a priority are 3, 6, 10. We track the individual - // indices since when 3 is popped, we can advance admitted for that priority - // to 5. When 6 is popped, admitted can advance to 9. We should never have a - // situation where indices are popped out of order, but we tolerate that by - // popping the prefix upto the index being popped. - waiting [raftpb.NumPriorities][]admissionEntry -} - -type admissionEntry struct { - index uint64 - leaderTerm uint64 -} - -func (w *waitingForAdmissionState) add(leaderTerm uint64, index uint64, pri raftpb.Priority) { - n := len(w.waiting[pri]) - i := n - // Linear scan, and all the scanned items will be removed. - for ; i > 0; i-- { - if w.waiting[pri][i-1].index < index { - break - } - if buildutil.CrdbTestBuild { - if overwrittenTerm := w.waiting[pri][i-1].leaderTerm; overwrittenTerm >= leaderTerm { - panic(errors.AssertionFailedf("overwritten entry has leaderTerm %d >= %d", - overwrittenTerm, leaderTerm)) - } - } - } - // Uncommon case: i < n. - if i < n { - w.waiting[pri] = w.waiting[pri][:i] - n = i - } - if buildutil.CrdbTestBuild { - if n > 0 && w.waiting[pri][n-1].leaderTerm > leaderTerm { - panic(errors.AssertionFailedf("non-monotonic leader terms %d >= %d", - w.waiting[pri][n-1].leaderTerm, leaderTerm)) - } - } - w.waiting[pri] = append(w.waiting[pri], admissionEntry{ - index: index, - leaderTerm: leaderTerm, - }) -} - -func (w *waitingForAdmissionState) remove( - leaderTerm uint64, index uint64, pri raftpb.Priority, -) (admittedMayAdvance bool) { - // We expect to typically find the exact entry, except if it was removed - // because of the term advancing and the entry being overwritten in the - // log. Due to the monotonicity assumption in add, we simply do a linear - // scan and remove a prefix. - // - // pos is the last entry that is removed. - pos := -1 - n := len(w.waiting[pri]) - for ; pos+1 < n; pos++ { - w := w.waiting[pri][pos+1] - if w.leaderTerm > leaderTerm || w.index > index { - break - } - } - w.waiting[pri] = w.waiting[pri][pos+1:] - return pos >= 0 -} - -func (w *waitingForAdmissionState) computeAdmitted( - stableIndex uint64, -) [raftpb.NumPriorities]uint64 { - var admitted [raftpb.NumPriorities]uint64 - for i := range w.waiting { - admitted[i] = stableIndex - if len(w.waiting[i]) > 0 { - upperBoundAdmitted := w.waiting[i][0].index - 1 - if upperBoundAdmitted < admitted[i] { - admitted[i] = upperBoundAdmitted - } - } - } - return admitted -} diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission_test.go index 64eef9a11f2d..b12fcd35ecdd 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission_test.go @@ -103,62 +103,3 @@ func readPriority(t *testing.T, d *datadriven.TestData) raftpb.Priority { } return 0 } - -func TestWaitingForAdmissionState(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - var w waitingForAdmissionState - waitingStateString := func() string { - var b strings.Builder - for i := range w.waiting { - fmt.Fprintf(&b, "%s:", raftpb.Priority(i)) - for _, entry := range w.waiting[i] { - fmt.Fprintf(&b, " (i: %d, term: %d)", entry.index, entry.leaderTerm) - } - fmt.Fprintf(&b, "\n") - } - return b.String() - } - argsLeaderIndexPri := func( - t *testing.T, d *datadriven.TestData) (leaderTerm uint64, index uint64, pri raftpb.Priority) { - d.ScanArgs(t, "leader-term", &leaderTerm) - d.ScanArgs(t, "index", &index) - pri = readPriority(t, d) - return - } - datadriven.RunTest(t, datapathutils.TestDataPath(t, "waiting_for_admission_state"), - func(t *testing.T, d *datadriven.TestData) string { - switch d.Cmd { - case "add": - // Example: - // add leader-term=3 index=5 pri=LowPri - // Adds for tracking index 5, with the given priority, - // received at the specified leader-term. - leaderTerm, index, pri := argsLeaderIndexPri(t, d) - w.add(leaderTerm, index, pri) - return waitingStateString() - - case "remove": - // Example: - // remove leader-term=3 index=5 pri=LowPri - // Removes an entry after admission. - leaderTerm, index, pri := argsLeaderIndexPri(t, d) - advanced := w.remove(leaderTerm, index, pri) - return fmt.Sprintf("admittedAdvanced: %t\n%s", advanced, waitingStateString()) - - case "compute-admitted": - // Example: - // compute-admitted stable-index=7 - // Computes the admitted array. - var stableIndex uint64 - d.ScanArgs(t, "stable-index", &stableIndex) - admitted := w.computeAdmitted(stableIndex) - return fmt.Sprintf("admitted: [%d, %d, %d, %d]\n", - admitted[0], admitted[1], admitted[2], admitted[3]) - - default: - return fmt.Sprintf("unknown command: %s", d.Cmd) - } - }) -} diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/waiting_for_admission_state b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/waiting_for_admission_state deleted file mode 100644 index 7e225b7a2699..000000000000 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/waiting_for_admission_state +++ /dev/null @@ -1,143 +0,0 @@ -add leader-term=3 index=5 pri=LowPri ----- -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: - -add leader-term=3 index=6 pri=HighPri ----- -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: (i: 6, term: 3) - -add leader-term=3 index=7 pri=HighPri ----- -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: (i: 6, term: 3) (i: 7, term: 3) - -# Noop, since old term. -remove leader-term=2 index=7 pri=HighPri ----- -admittedAdvanced: false -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: (i: 6, term: 3) (i: 7, term: 3) - -remove leader-term=3 index=6 pri=HighPri ----- -admittedAdvanced: true -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: (i: 7, term: 3) - -compute-admitted stable-index=7 ----- -admitted: [4, 7, 7, 6] - -add leader-term=3 index=8 pri=HighPri ----- -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: (i: 7, term: 3) (i: 8, term: 3) - -compute-admitted stable-index=8 ----- -admitted: [4, 8, 8, 6] - -remove leader-term=3 index=8 pri=HighPri ----- -admittedAdvanced: true -LowPri: (i: 5, term: 3) -NormalPri: -AboveNormalPri: -HighPri: - -compute-admitted stable-index=8 ----- -admitted: [4, 8, 8, 8] - -add leader-term=3 index=9 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 9, term: 3) -NormalPri: -AboveNormalPri: -HighPri: - -add leader-term=3 index=11 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 9, term: 3) (i: 11, term: 3) -NormalPri: -AboveNormalPri: -HighPri: - -compute-admitted stable-index=5 ----- -admitted: [4, 5, 5, 5] - -# New term, and a suffix is removed. -add leader-term=4 index=10 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 9, term: 3) (i: 10, term: 4) -NormalPri: -AboveNormalPri: -HighPri: - -# New term, and a suffix is removed. -add leader-term=5 index=9 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 9, term: 5) -NormalPri: -AboveNormalPri: -HighPri: - -# New term, and a suffix is removed. -add leader-term=6 index=7 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 7, term: 6) -NormalPri: -AboveNormalPri: -HighPri: - -# New term, no suffix is removed. -add leader-term=7 index=8 pri=LowPri ----- -LowPri: (i: 5, term: 3) (i: 7, term: 6) (i: 8, term: 7) -NormalPri: -AboveNormalPri: -HighPri: - -# Not found, but a prefix is removed. -remove leader-term=7 index=6 pri=LowPri ----- -admittedAdvanced: true -LowPri: (i: 7, term: 6) (i: 8, term: 7) -NormalPri: -AboveNormalPri: -HighPri: - -remove leader-term=6 index=8 pri=LowPri ----- -admittedAdvanced: true -LowPri: (i: 8, term: 7) -NormalPri: -AboveNormalPri: -HighPri: - -compute-admitted stable-index=9 ----- -admitted: [7, 9, 9, 9] - -remove leader-term=7 index=8 pri=LowPri ----- -admittedAdvanced: true -LowPri: -NormalPri: -AboveNormalPri: -HighPri: