From c12c6898c6c73a1626da8f756b4243850c80fdd2 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 16:43:02 +0100 Subject: [PATCH 1/8] replica_rac2: rm unused raftNode methods Epic: none Release note: none --- .../kvflowcontrol/replica_rac2/processor.go | 19 ++----------------- .../replica_rac2/processor_test.go | 11 ----------- .../kvflowcontrol/replica_rac2/raft_node.go | 17 ----------------- .../kvserver/kvflowcontrol/replica_rac2/txt | 1 + 4 files changed, 3 insertions(+), 45 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/replica_rac2/txt diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index c5d89b2e07b7..3872e3f0dc42 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -88,16 +88,8 @@ type RaftScheduler interface { // reads Raft state at various points while holding raftMu, and expects those // various reads to be mutually consistent. type RaftNode interface { - // EnablePingForAdmittedLaggingLocked is a one time behavioral change made - // to enable pinging for the admitted array when it is lagging match. Once - // changed, this will apply to current and future leadership roles at this - // replica. - EnablePingForAdmittedLaggingLocked() - - // Read-only methods. - - // rac2.RaftInterface is an interface that abstracts the raft.RawNode for use - // in the RangeController. + // RaftInterface is an interface that abstracts the raft.RawNode for use in + // the RangeController. rac2.RaftInterface // TermLocked returns the current term of this replica. TermLocked() uint64 @@ -109,7 +101,6 @@ type RaftNode interface { // guaranteed to be stablestorage, unless this method is called right after // RawNode is initialized. Processor calls this only on initialization. LogMarkLocked() rac2.LogMark - // NextUnstableIndexLocked returns the index of the next entry that will // be sent to local storage. All entries < this index are either stored, // or have been sent to storage. @@ -117,12 +108,6 @@ type RaftNode interface { // NB: NextUnstableIndex can regress when the node accepts appends or // snapshots from a newer leader. NextUnstableIndexLocked() uint64 - - // Mutating methods. - - // StepMsgAppRespForAdmittedLocked steps a MsgAppResp on the leader, which - // may advance its knowledge of a follower's admitted state. - StepMsgAppRespForAdmittedLocked(raftpb.Message) error } // AdmittedPiggybacker is used to enqueue admitted vector messages addressed to diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 5073572e2760..bd62969d7002 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -104,11 +104,6 @@ type testRaftNode struct { nextUnstableIndex uint64 } -func (rn *testRaftNode) EnablePingForAdmittedLaggingLocked() { - rn.r.mu.AssertHeld() - fmt.Fprintf(rn.b, " RaftNode.EnablePingForAdmittedLaggingLocked\n") -} - func (rn *testRaftNode) TermLocked() uint64 { rn.r.mu.AssertHeld() fmt.Fprintf(rn.b, " RaftNode.TermLocked() = %d\n", rn.term) @@ -133,12 +128,6 @@ func (rn *testRaftNode) NextUnstableIndexLocked() uint64 { return rn.nextUnstableIndex } -func (rn *testRaftNode) StepMsgAppRespForAdmittedLocked(msg raftpb.Message) error { - rn.r.mu.AssertHeld() - fmt.Fprintf(rn.b, " RaftNode.StepMsgAppRespForAdmittedLocked(%s)\n", msgString(msg)) - return nil -} - func (rn *testRaftNode) FollowerStateRaftMuLocked( replicaID roachpb.ReplicaID, ) rac2.FollowerStateInfo { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index 6109a29d5f99..93781bf1df12 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -27,10 +27,6 @@ func NewRaftNode(rn *raft.RawNode) RaftNode { return raftNodeForRACv2{RawNode: rn} } -func (rn raftNodeForRACv2) EnablePingForAdmittedLaggingLocked() { - panic("TODO(pav-kv): implement") -} - func (rn raftNodeForRACv2) TermLocked() uint64 { return rn.Term() } @@ -47,19 +43,6 @@ func (rn raftNodeForRACv2) NextUnstableIndexLocked() uint64 { return rn.NextUnstableIndex() } -func (rn raftNodeForRACv2) GetAdmittedLocked() [raftpb.NumPriorities]uint64 { - // TODO(pav-kv): implement. - return [raftpb.NumPriorities]uint64{} -} - -func (rn raftNodeForRACv2) SetAdmittedLocked([raftpb.NumPriorities]uint64) raftpb.Message { - panic("TODO(pav-kv): implement") -} - -func (rn raftNodeForRACv2) StepMsgAppRespForAdmittedLocked(m raftpb.Message) error { - return rn.RawNode.Step(m) -} - func (rn raftNodeForRACv2) FollowerStateRaftMuLocked( replicaID roachpb.ReplicaID, ) rac2.FollowerStateInfo { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/txt b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/txt new file mode 100644 index 000000000000..c9639b47aa7c --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/txt @@ -0,0 +1 @@ +bazel coverage --compilation_mode=fastbuild --@io_bazel_rules_go//go/config:cover_format=lcov --combined_report=lcov --instrumentation_filter="//pkg/kv/kvserver/kvflowcontrol/replica_rac2" --test_verbose_timeout_warnings --test_tmpdir=/tmp --sandbox_debug //pkg/kv/kvserver/kvflowcontrol/replica_rac2:small_tests From 771d3afe62942d96f4972ff56eef4dcf63242a1f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 06:22:08 +0100 Subject: [PATCH 2/8] rac2: compute priority once and if needed Epic: none Release note: none --- pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index d5c8da72ebf1..32f0f5faea9b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -750,10 +750,10 @@ func (rss *replicaSendStream) returnTokens( ctx context.Context, returned [raftpb.NumPriorities]kvflowcontrol.Tokens, ) { for pri, tokens := range returned { - pri := raftpb.Priority(pri) if tokens > 0 { - rss.parent.evalTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens) - rss.parent.sendTokenCounter.Return(ctx, WorkClassFromRaftPriority(pri), tokens) + pri := WorkClassFromRaftPriority(raftpb.Priority(pri)) + rss.parent.evalTokenCounter.Return(ctx, pri, tokens) + rss.parent.sendTokenCounter.Return(ctx, pri, tokens) } } } From 42a198381647c41c18b6d4638dc9972b528e0027 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 17:02:09 +0100 Subject: [PATCH 3/8] rac2: introduce AdmitRaftMuLocked stack Epic: none Release note: none --- .../kvflowcontrol/rac2/range_controller.go | 35 +++++++++++++++++-- .../replica_rac2/processor_test.go | 6 ++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 32f0f5faea9b..5af42d71ccf8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -60,6 +60,12 @@ type RangeController interface { // // Requires replica.raftMu to be held. HandleSchedulerEventRaftMuLocked(ctx context.Context) error + // AdmitRaftMuLocked handles the notification about the given replica's + // admitted vector change. No-op if the replica is not known, or the admitted + // vector is stale (either in Term, or the indices). + // + // Requires replica.raftMu to be held. + AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, AdmittedVector) // SetReplicasRaftMuLocked sets the replicas of the range. The caller will // never mutate replicas, and neither should the callee. // @@ -375,6 +381,19 @@ func (rc *rangeController) HandleSchedulerEventRaftMuLocked(ctx context.Context) panic("unimplemented") } +// AdmitRaftMuLocked handles the notification about the given replica's +// admitted vector change. No-op if the replica is not known, or the admitted +// vector is stale (either in Term, or the indices). +// +// Requires replica.raftMu to be held. +func (rc *rangeController) AdmitRaftMuLocked( + ctx context.Context, replicaID roachpb.ReplicaID, av AdmittedVector, +) { + if rs, ok := rc.replicaMap[replicaID]; ok { + rs.admit(ctx, av) + } +} + // SetReplicasRaftMuLocked sets the replicas of the range. The caller will // never mutate replicas, and neither should the callee. // @@ -530,6 +549,12 @@ func (rss *replicaSendStream) changeConnectedStateLocked(state connectedState, n rss.mu.connectedStateStart = now } +func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) { + rss.mu.Lock() + defer rss.mu.Unlock() + rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted)) +} + func (rs *replicaState) createReplicaSendStream() { // Must be in StateReplicate on creation. rs.sendStream = &replicaSendStream{ @@ -676,6 +701,12 @@ func (rs *replicaState) handleReadyState( return shouldWaitChange } +func (rs *replicaState) admit(ctx context.Context, av AdmittedVector) { + if rss := rs.sendStream; rss != nil { + rss.admit(ctx, av) + } +} + func (rss *replicaState) closeSendStream(ctx context.Context) { rss.sendStream.mu.Lock() defer rss.sendStream.mu.Unlock() @@ -693,9 +724,7 @@ func (rss *replicaSendStream) makeConsistentInStateReplicate( ctx context.Context, ) (shouldWaitChange bool) { av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID) - rss.mu.Lock() - defer rss.mu.Unlock() - defer rss.returnTokens(ctx, rss.mu.tracker.Untrack(av.Term, av.Admitted)) + rss.admit(ctx, av) // The leader is always in state replicate. if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index bd62969d7002..64820bd46d2f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -238,6 +238,12 @@ func (c *testRangeController) HandleSchedulerEventRaftMuLocked(ctx context.Conte panic("HandleSchedulerEventRaftMuLocked should not be called when no send-queues") } +func (c *testRangeController) AdmitRaftMuLocked( + _ context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector, +) { + fmt.Fprintf(c.b, " RangeController.AdmitRaftMuLocked(%s, %+v)\n", replicaID, av) +} + func (c *testRangeController) SetReplicasRaftMuLocked( ctx context.Context, replicas rac2.ReplicaSet, ) error { From 245fe20052456892c55d54f80fd584a6d26d232c Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 17:05:53 +0100 Subject: [PATCH 4/8] rac2: plumb AdmitRaftMuLocked to tests Epic: none Release note: none --- .../kvflowcontrol/rac2/range_controller.go | 3 --- .../rac2/range_controller_test.go | 27 ++++++------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 5af42d71ccf8..f3758fe16689 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -723,9 +723,6 @@ func (rss *replicaState) closeSendStream(ctx context.Context) { func (rss *replicaSendStream) makeConsistentInStateReplicate( ctx context.Context, ) (shouldWaitChange bool) { - av := rss.parent.parent.opts.AdmittedTracker.GetAdmitted(rss.parent.desc.ReplicaID) - rss.admit(ctx, av) - // The leader is always in state replicate. if rss.parent.parent.opts.LocalReplicaID == rss.parent.desc.ReplicaID { if rss.mu.connectedState != replicate { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 8e49cf6f9e1c..f695561e6dc9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -319,29 +319,15 @@ func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPrior }() } -func (r *testingRCRange) admit( - ctx context.Context, - t *testing.T, - storeID roachpb.StoreID, - term uint64, - toIndex uint64, - pri admissionpb.WorkPriority, -) { +func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av AdmittedVector) { r.mu.Lock() - + defer r.mu.Unlock() for _, replica := range r.mu.r.replicaSet { if replica.desc.StoreID == storeID { - replica := replica - replica.av.Admitted[AdmissionToRaftPriority(pri)] = toIndex - replica.av.Term = term - r.mu.r.replicaSet[replica.desc.ReplicaID] = replica - break + r.rc.AdmitRaftMuLocked(ctx, replica.desc.ReplicaID, av) + return } } - - r.mu.Unlock() - // Send an empty raft event in order to trigger potential token return. - require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{})) } type testingRange struct { @@ -825,7 +811,10 @@ func TestRangeController(t *testing.T) { require.True(t, strings.HasPrefix(parts[3], "pri=")) parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "pri=") pri := parsePriority(t, parts[3]) - state.ranges[lastRangeID].admit(ctx, t, roachpb.StoreID(storeID), uint64(term), uint64(to_index), pri) + + av := AdmittedVector{Term: uint64(term)} + av.Admitted[AdmissionToRaftPriority(pri)] = uint64(to_index) + state.ranges[lastRangeID].admit(ctx, roachpb.StoreID(storeID), av) } } return state.tokenCountsString() From cfc8012a2d325bbf4573270857b62368b669243b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 17:12:47 +0100 Subject: [PATCH 5/8] rac2: remove AdmittedTracker interface Epic: none Release note: none --- .../kvflowcontrol/rac2/range_controller.go | 12 ------- .../rac2/range_controller_test.go | 13 -------- .../kvflowcontrol/replica_rac2/processor.go | 33 +++++++------------ 3 files changed, 11 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index f3758fe16689..8c1908928df2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -106,17 +106,6 @@ type FollowerStateInfo struct { Next uint64 } -// AdmittedTracker is used to retrieve the latest admitted vector for a -// replica (including the leader). -type AdmittedTracker interface { - // GetAdmitted returns the latest AdmittedVector for replicaID. It returns - // an empty struct if the replicaID is not known. NB: the - // AdmittedVector.Admitted[i] value can transiently advance past - // FollowerStateInfo.Match, since the admitted tracking subsystem is - // separate from Raft. - GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector -} - // RaftEvent carries a RACv2-relevant subset of raft state sent to storage. type RaftEvent struct { // Term is the leader term on whose behalf the entries or snapshot are @@ -206,7 +195,6 @@ type RangeControllerOptions struct { RaftInterface RaftInterface Clock *hlc.Clock CloseTimerScheduler ProbeToCloseTimerScheduler - AdmittedTracker AdmittedTracker EvalWaitMetrics *EvalWaitMetrics } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index f695561e6dc9..319b131c71c8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -239,7 +239,6 @@ func (s *testingRCState) getOrInitRange(r testingRange) *testingRCRange { RaftInterface: testRC, Clock: s.clock, CloseTimerScheduler: s.probeToCloseScheduler, - AdmittedTracker: testRC, EvalWaitMetrics: s.evalMetrics, } @@ -284,17 +283,6 @@ func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) return replica.info } -func (r *testingRCRange) GetAdmitted(replicaID roachpb.ReplicaID) AdmittedVector { - r.mu.Lock() - defer r.mu.Unlock() - - replica, ok := r.mu.r.replicaSet[replicaID] - if !ok { - return AdmittedVector{} - } - return replica.av -} - func (r *testingRCRange) startWaitForEval(name string, pri admissionpb.WorkPriority) { r.mu.Lock() defer r.mu.Unlock() @@ -350,7 +338,6 @@ const invalidTrackerState = tracker.StateSnapshot + 1 type testingReplica struct { desc roachpb.ReplicaDescriptor info FollowerStateInfo - av AdmittedVector } func scanRanges(t *testing.T, input string) []testingRange { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 3872e3f0dc42..3b05e2027912 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -165,11 +165,10 @@ type rangeControllerInitState struct { // These fields are required options for the RangeController specific to the // replica and range, rather than the store or node, so we pass them as part // of the range controller init state. - rangeID roachpb.RangeID - tenantID roachpb.TenantID - localReplicaID roachpb.ReplicaID - raftInterface rac2.RaftInterface - admittedTracker rac2.AdmittedTracker + rangeID roachpb.RangeID + tenantID roachpb.TenantID + localReplicaID roachpb.ReplicaID + raftInterface rac2.RaftInterface } // RangeControllerFactory abstracts RangeController creation for testing. @@ -468,8 +467,6 @@ type processorImpl struct { var _ Processor = &processorImpl{} -var _ rac2.AdmittedTracker = &processorImpl{} - func NewProcessor(opts ProcessorOptions) Processor { p := &processorImpl{opts: opts} p.mu.enabledWhenLeader = opts.EnabledWhenLeaderLevel @@ -693,14 +690,13 @@ func (p *processorImpl) createLeaderStateRaftMuLockedProcLocked( p.mu.leader.rcReferenceUpdateMu.Lock() defer p.mu.leader.rcReferenceUpdateMu.Unlock() p.mu.leader.rc = p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{ - replicaSet: p.raftMu.replicas, - leaseholder: p.mu.leaseholderID, - nextRaftIndex: nextUnstableIndex, - rangeID: p.opts.RangeID, - tenantID: p.raftMu.tenantID, - localReplicaID: p.opts.ReplicaID, - raftInterface: p.raftMu.raftNode, - admittedTracker: p, + replicaSet: p.raftMu.replicas, + leaseholder: p.mu.leaseholderID, + nextRaftIndex: nextUnstableIndex, + rangeID: p.opts.RangeID, + tenantID: p.raftMu.tenantID, + localReplicaID: p.opts.ReplicaID, + raftInterface: p.raftMu.raftNode, }) }() p.mu.leader.term = term @@ -984,12 +980,6 @@ func (p *processorImpl) AdmitForEval( return rc.WaitForEval(ctx, pri) } -// GetAdmitted implements rac2.AdmittedTracker. -func (p *processorImpl) GetAdmitted(replicaID roachpb.ReplicaID) rac2.AdmittedVector { - // TODO(pav-kv): implement - return rac2.AdmittedVector{} -} - // RangeControllerFactoryImpl implements the RangeControllerFactory interface. var _ RangeControllerFactory = RangeControllerFactoryImpl{} @@ -1031,7 +1021,6 @@ func (f RangeControllerFactoryImpl) New( RaftInterface: state.raftInterface, Clock: f.clock, CloseTimerScheduler: f.closeTimerScheduler, - AdmittedTracker: state.admittedTracker, EvalWaitMetrics: f.evalWaitMetrics, }, rac2.RangeControllerInitState{ From f9b36bae94270c8c8cc2e63c752e10c1e501d713 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 17:30:31 +0100 Subject: [PATCH 6/8] rac2: add AdmittedVector.Merge helper Epic: none Release note: none --- .../kvflowcontrol/rac2/log_tracker.go | 17 ++++++++++++++ .../kvflowcontrol/rac2/log_tracker_test.go | 23 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index 06fb23496847..b37c70f0cb64 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -34,6 +34,23 @@ type AdmittedVector struct { Admitted [raftpb.NumPriorities]uint64 } +// Merge merges two admitted vectors into one. A higher-term vector always wins. +// If the terms match, the admitted indices are computed as max of the two, for +// each priority. +func (av AdmittedVector) Merge(other AdmittedVector) AdmittedVector { + if other.Term > av.Term { + return other + } else if other.Term < av.Term { + return av + } + for pri, my := range av.Admitted { + if their := other.Admitted[pri]; their > my { + av.Admitted[pri] = their + } + } + return av +} + // LogTracker tracks the durable and logically admitted state of a raft log. // // Writes to a raft log are ordered by LogMark (term, index) where term is the diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go index 6167068e5fb8..5c521cec2c60 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go @@ -47,6 +47,29 @@ func (l *LogTracker) check(t *testing.T) { } } +func TestAdmittedVectorMerge(t *testing.T) { + av := func(term uint64, indices ...uint64) AdmittedVector { + av := AdmittedVector{Term: term} + require.Len(t, indices, len(av.Admitted)) + copy(av.Admitted[:], indices) + return av + } + for _, tt := range [][3]AdmittedVector{ + // Different terms. Higher term wins. Merge is symmetric. + {av(3, 10, 11, 12, 12), av(4, 10, 10, 20, 20), av(4, 10, 10, 20, 20)}, + {av(4, 10, 10, 20, 20), av(3, 10, 11, 12, 12), av(4, 10, 10, 20, 20)}, + // Same term. Highest index wins at each priority. + {av(3, 10, 10, 10, 10), av(3, 20, 20, 20, 20), av(3, 20, 20, 20, 20)}, + {av(3, 20, 20, 20, 20), av(3, 10, 10, 10, 10), av(3, 20, 20, 20, 20)}, + {av(3, 10, 11, 12, 12), av(3, 8, 9, 20, 20), av(3, 10, 11, 20, 20)}, + {av(3, 5, 10, 5, 10), av(3, 10, 5, 10, 5), av(3, 10, 10, 10, 10)}, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tt[2], tt[0].Merge(tt[1])) + }) + } +} + func TestLogTrackerAppend(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From 253e0dfa755ce66a394ed952fb94d046cc60d40f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 10 Sep 2024 03:08:49 +0100 Subject: [PATCH 7/8] replica_rac2: integrate admitted tracking on leader This commit plumbs the admitted vectors into the RACv2 Processor. Admitted vectors are applied to replicaSendStream and cause releasing the tokens held by the leader. The admitted vectors are plumbed to replicaSendStream via 3 paths: - The leader's own admitted vector is applied from HandleRaftReadyRaftMuLocked, calling into RangeController.AdmitRaftMuLocked directly. - The followers' admitted vectors in most cases are received via annotated RaftMessageRequest.AdmittedState, which is dispatched from stepRaftGroupRaftMuLocked into the Processor via Processor.AdmitRaftMuLocked method. - The followers' piggybacked admitted vectors from RaftMessageRequestBatch are queued on the Processor via EnqueuePiggybackedAdmittedAtLeader method, and later applied from HandleRaftReadyRaftMuLocked. Epic: none Release note: none --- .../flow_control_raft_transport_test.go | 2 +- pkg/kv/kvserver/flow_control_stores.go | 12 +-- .../kvflowcontrol/replica_rac2/processor.go | 92 ++++++++++++------- .../replica_rac2/processor_test.go | 13 +-- .../replica_rac2/testdata/processor | 13 ++- pkg/kv/kvserver/raft_transport.go | 7 +- pkg/kv/kvserver/replica_raft.go | 9 +- 7 files changed, 86 insertions(+), 62 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_raft_transport_test.go b/pkg/kv/kvserver/flow_control_raft_transport_test.go index b41113a7ee7c..9e70c3e7d182 100644 --- a/pkg/kv/kvserver/flow_control_raft_transport_test.go +++ b/pkg/kv/kvserver/flow_control_raft_transport_test.go @@ -796,6 +796,6 @@ func TestFlowControlRaftTransportV2(t *testing.T) { type noopPiggybackedAdmittedResponseScheduler struct{} func (s noopPiggybackedAdmittedResponseScheduler) ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange, + context.Context, []kvflowcontrolpb.PiggybackedAdmittedState, ) { } diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index 87c5081acb1c..5a8ddd27c2a8 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -291,7 +291,7 @@ type StoresForRACv2 interface { // processing. type PiggybackedAdmittedResponseScheduler interface { ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange) + ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState) } func MakeStoresForRACv2(stores *Stores) StoresForRACv2 { @@ -332,20 +332,20 @@ func (ss *storesForRACv2) lookup( // ScheduleAdmittedResponseForRangeRACv2 implements PiggybackedAdmittedResponseScheduler. func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange, + ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState, ) { ls := (*Stores)(ss) for _, m := range msgs { - s, err := ls.GetStore(m.LeaderStoreID) + s, err := ls.GetStore(m.ToStoreID) if err != nil { - log.Errorf(ctx, "store %s not found", m.LeaderStoreID) + log.Errorf(ctx, "store %s not found", m.ToStoreID) continue } repl := s.GetReplicaIfExists(m.RangeID) - if repl == nil { + if repl == nil || repl.replicaID != m.ToReplicaID { continue } - repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.Msg) + repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.FromReplicaID, m.Admitted) s.scheduler.EnqueueRACv2PiggybackAdmitted(m.RangeID) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 3b05e2027912..62a2dbc8226e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -353,14 +353,13 @@ type Processor interface { AdmitRaftEntriesRaftMuLocked( ctx context.Context, event rac2.RaftEvent) bool - // EnqueuePiggybackedAdmittedAtLeader is called at the leader when - // receiving a piggybacked MsgAppResp that can advance a follower's - // admitted state. The caller is responsible for scheduling on the raft - // scheduler, such that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked - // gets called soon. - EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message) + // EnqueuePiggybackedAdmittedAtLeader is called at the leader when receiving a + // piggybacked admitted vector that can advance the given follower's admitted + // state. The caller is responsible for scheduling on the raft scheduler, such + // that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked gets called soon. + EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID, kvflowcontrolpb.AdmittedState) // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked is called to process - // previous enqueued piggybacked MsgAppResp. Returns true if + // previously enqueued piggybacked admitted vectors. Returns true if // HandleRaftReadyRaftMuLocked should be called. // // raftMu is held. @@ -390,6 +389,13 @@ type Processor interface { // AdmittedState returns the vector of admitted log indices. AdmittedState() rac2.AdmittedVector + // AdmitRaftMuLocked is called to notify RACv2 about the admitted vector + // update on the given peer replica. This releases the associated flow tokens + // if the replica is known and the admitted vector covers any. + // + // raftMu is held. + AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector) + // AdmitForEval is called to admit work that wants to evaluate at the // leaseholder. // @@ -429,7 +435,14 @@ type processorImpl struct { // State when leader, i.e., when leaderID == opts.ReplicaID, and v2 // protocol is enabled. leader struct { - enqueuedPiggybackedResponses map[roachpb.ReplicaID]raftpb.Message + // pendingAdmitted contains recently delivered admitted vectors. When this + // map is not empty, the range is scheduled for applying these vectors to + // the corresponding streams / token trackers. The map is cleared when the + // admitted vectors are applied. + // + // Invariant: rc == nil ==> len(pendingAdmitted) == 0. + // Invariant: len(pendingAdmitted) != 0 ==> the processing is scheduled. + pendingAdmitted map[roachpb.ReplicaID]rac2.AdmittedVector // Updating the rc reference requires both the enclosing mu and // rcReferenceUpdateMu. Code paths that want to access this // reference only need one of these mutexes. rcReferenceUpdateMu @@ -676,7 +689,7 @@ func (p *processorImpl) closeLeaderStateRaftMuLockedProcLocked(ctx context.Conte defer p.mu.leader.rcReferenceUpdateMu.Unlock() p.mu.leader.rc = nil }() - p.mu.leader.enqueuedPiggybackedResponses = nil + p.mu.leader.pendingAdmitted = nil p.mu.leader.term = 0 } @@ -700,7 +713,7 @@ func (p *processorImpl) createLeaderStateRaftMuLockedProcLocked( }) }() p.mu.leader.term = term - p.mu.leader.enqueuedPiggybackedResponses = map[roachpb.ReplicaID]raftpb.Message{} + p.mu.leader.pendingAdmitted = map[roachpb.ReplicaID]rac2.AdmittedVector{} } // HandleRaftReadyRaftMuLocked implements Processor. @@ -744,20 +757,23 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. return } - // Piggyback admitted index advancement, if any, to the message stream going - // to the leader node, if we are not the leader. At the leader node, the - // admitted vector is read directly from the log tracker. - if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 { - // TODO(pav-kv): must make sure the leader term is the same. - if admitted, dirty := p.logTracker.admitted(true /* sched */); dirty { + if av, dirty := p.logTracker.admitted(true /* sched */); dirty { + if rc := p.mu.leader.rc; rc != nil { + // If we are the leader, notify the RangeController about our replica's + // new admitted vector. + rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av) + } else if p.mu.leaderNodeID != 0 { + // If the leader is known, piggyback the updated admitted vector to the + // message stream going to the leader node. + // TODO(pav-kv): must make sure the leader term is the same. p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ RangeID: p.opts.RangeID, ToStoreID: p.mu.leaderStoreID, FromReplicaID: p.opts.ReplicaID, ToReplicaID: p.mu.leaderID, Admitted: kvflowcontrolpb.AdmittedState{ - Term: admitted.Term, - Admitted: admitted.Admitted[:], + Term: av.Term, + Admitted: av.Admitted[:], }}) } } @@ -864,18 +880,20 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2 } // EnqueuePiggybackedAdmittedAtLeader implements Processor. -func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message) { - if roachpb.ReplicaID(msg.To) != p.opts.ReplicaID { - // Ignore message to a stale ReplicaID. - return - } +func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader( + from roachpb.ReplicaID, state kvflowcontrolpb.AdmittedState, +) { p.mu.Lock() defer p.mu.Unlock() if p.mu.leader.rc == nil { return } - // Only need to keep the latest message from a replica. - p.mu.leader.enqueuedPiggybackedResponses[roachpb.ReplicaID(msg.From)] = msg + var admitted [raftpb.NumPriorities]uint64 + copy(admitted[:], state.Admitted) + // Merge in the received admitted vector. We are only interested in the + // highest admitted marks. Zero value always merges in favour of the new one. + p.mu.leader.pendingAdmitted[from] = p.mu.leader.pendingAdmitted[from].Merge( + rac2.AdmittedVector{Term: state.Term, Admitted: admitted}) } // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor. @@ -883,18 +901,13 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte p.opts.Replica.RaftMuAssertHeld() p.mu.Lock() defer p.mu.Unlock() - if p.mu.destroyed || len(p.mu.leader.enqueuedPiggybackedResponses) == 0 || p.raftMu.raftNode == nil { + if p.mu.destroyed || len(p.mu.leader.pendingAdmitted) == 0 { return false } - p.opts.Replica.MuLock() - defer p.opts.Replica.MuUnlock() - for k, m := range p.mu.leader.enqueuedPiggybackedResponses { - err := p.raftMu.raftNode.StepMsgAppRespForAdmittedLocked(m) - if err != nil { - log.Errorf(ctx, "%s", err) - } - delete(p.mu.leader.enqueuedPiggybackedResponses, k) + for replicaID, state := range p.mu.leader.pendingAdmitted { + p.mu.leader.rc.AdmitRaftMuLocked(ctx, replicaID, state) } + clear(p.mu.leader.pendingAdmitted) return true } @@ -954,6 +967,17 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector { return admitted } +// AdmitRaftMuLocked implements Processor. +func (p *processorImpl) AdmitRaftMuLocked( + ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector, +) { + p.opts.Replica.RaftMuAssertHeld() + // NB: rc is always updated while raftMu is held. + if rc := p.mu.leader.rc; rc != nil { + rc.AdmitRaftMuLocked(ctx, replicaID, av) + } +} + // AdmitForEval implements Processor. func (p *processorImpl) AdmitForEval( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 64820bd46d2f..55799e2d09db 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -155,10 +155,6 @@ func (rn *testRaftNode) print() { rn.term, rn.leader, rn.r.leaseholder, rn.mark, rn.nextUnstableIndex) } -func msgString(msg raftpb.Message) string { - return fmt.Sprintf("type: %s from: %d to: %d", msg.Type.String(), msg.From, msg.To) -} - type testAdmittedPiggybacker struct { b *strings.Builder } @@ -412,12 +408,9 @@ func TestProcessorBasic(t *testing.T) { var from, to uint64 d.ScanArgs(t, "from", &from) d.ScanArgs(t, "to", &to) - msg := raftpb.Message{ - Type: raftpb.MsgAppResp, - To: raftpb.PeerID(to), - From: raftpb.PeerID(from), - } - p.EnqueuePiggybackedAdmittedAtLeader(msg) + // TODO(pav-kv): parse the admitted vector. + p.EnqueuePiggybackedAdmittedAtLeader( + roachpb.ReplicaID(from), kvflowcontrolpb.AdmittedState{}) return builderStr() case "process-piggybacked-admitted": diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index cc2a59234f7c..1026d09d578b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -122,7 +122,7 @@ HandleRaftReady: AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:25} Priority:LowPri}}) = true destroyed-or-leader-using-v2: true -LogTracker [+dirty]: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23] +LogTracker: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23] LowPri: {Term:50 Index:25} # The leader has changed. @@ -463,6 +463,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -483,6 +484,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[28 28 28 28]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -494,9 +496,7 @@ enqueue-piggybacked-admitted from=25 to=5 process-piggybacked-admitted ---- Replica.RaftMuAssertHeld - Replica.MuLock - RaftNode.StepMsgAppRespForAdmittedLocked(type: MsgAppResp from: 25 to: 5) - Replica.MuUnlock + RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) # Noop. process-piggybacked-admitted @@ -511,6 +511,7 @@ enqueue-piggybacked-admitted from=25 to=6 process-piggybacked-admitted ---- Replica.RaftMuAssertHeld + RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) # We are still the leader, now at a new term. set-raft-state term=53 @@ -739,6 +740,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 50 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -761,7 +763,7 @@ HandleRaftReady: ..... AdmitRaftEntries: destroyed-or-leader-using-v2: true -LogTracker [+dirty]: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26] +LogTracker: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26] # Everything up to 27 is admitted. synced-log term=50 index=27 @@ -778,6 +780,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 50 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[27 27 27 27]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 5155aa8f8152..96e36310f5cd 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -534,14 +534,15 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs) t.kvflowControl.mu.Unlock() if len(batch.AdmittedStates) != 0 { - // TODO(pav-kv): dispatch admitted vectors to RACv2. + // Dispatch the admitted vectors to RACv2. // NB: we do this via this special path instead of using the // handleRaftRequest path since we don't have a full-fledged // RaftMessageRequest for each range (each of these responses could // be for a different range), and because what we need to do w.r.t. // queueing is much simpler (we don't need to worry about queue size - // since we only keep the latest message from each replica). - _ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2 + // since we only keep the highest admitted marks from each replica). + t.kvflowcontrol2.piggybackedResponseScheduler. + ScheduleAdmittedResponseForRangeRACv2(ctx, batch.AdmittedStates) } if len(batch.Requests) == 0 { continue diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 883325844cc8..1bc033f2d533 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -653,9 +653,12 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) } } case raftpb.MsgAppResp: - if req.AdmittedState.Term != 0 { - // TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached. - _ = 0 + // If there is an admitted vector annotation, pass it to RACv2 to release + // the flow control tokens. + if term := req.AdmittedState.Term; term != 0 { + av := rac2.AdmittedVector{Term: term} + copy(av.Admitted[:], req.AdmittedState.Admitted) + r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, av) } } err := raftGroup.Step(req.Message) From 49e476cdcb2c05002c4f36392bcee596edf384c8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 12 Sep 2024 10:56:33 +0100 Subject: [PATCH 8/8] replica_rac2: test piggybacked admitted vectors Epic: none Release note: none --- .../replica_rac2/processor_test.go | 17 +++++++++++++--- .../replica_rac2/testdata/processor | 20 +++++++------------ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 55799e2d09db..297a97c74926 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -408,9 +408,20 @@ func TestProcessorBasic(t *testing.T) { var from, to uint64 d.ScanArgs(t, "from", &from) d.ScanArgs(t, "to", &to) - // TODO(pav-kv): parse the admitted vector. - p.EnqueuePiggybackedAdmittedAtLeader( - roachpb.ReplicaID(from), kvflowcontrolpb.AdmittedState{}) + require.Equal(t, p.opts.ReplicaID, roachpb.ReplicaID(to)) + + var term, index, pri int + d.ScanArgs(t, "term", &term) + d.ScanArgs(t, "index", &index) + d.ScanArgs(t, "pri", &pri) + require.Less(t, pri, int(raftpb.NumPriorities)) + as := kvflowcontrolpb.AdmittedState{ + Term: uint64(term), + Admitted: make([]uint64, raftpb.NumPriorities), + } + as.Admitted[pri] = uint64(index) + + p.EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID(from), as) return builderStr() case "process-piggybacked-admitted": diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 1026d09d578b..f0cc98aa3fe2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -360,7 +360,7 @@ HandleRaftReady: ..... # Noop, since not the leader. -enqueue-piggybacked-admitted from=25 to=5 +enqueue-piggybacked-admitted from=25 to=5 term=50 index=24 pri=0 ---- # Noop. @@ -489,29 +489,23 @@ HandleRaftReady: ..... # Enqueue piggybacked admitted vector. -enqueue-piggybacked-admitted from=25 to=5 +enqueue-piggybacked-admitted from=25 to=5 term=52 index=24 pri=0 ---- -# Process it. -process-piggybacked-admitted +# Enqueue another piggybacked admitted vector, it merges into the previous one. +enqueue-piggybacked-admitted from=25 to=5 term=52 index=25 pri=2 ---- - Replica.RaftMuAssertHeld - RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) -# Noop. +# Process it. process-piggybacked-admitted ---- Replica.RaftMuAssertHeld - -# Noop, since the replica id does not match. -enqueue-piggybacked-admitted from=25 to=6 ----- + RangeController.AdmitRaftMuLocked(25, {Term:52 Admitted:[24 0 25 0]}) # Noop. process-piggybacked-admitted ---- Replica.RaftMuAssertHeld - RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) # We are still the leader, now at a new term. set-raft-state term=53 @@ -578,7 +572,7 @@ get-enabled-level enabled-level: v1-encoding # Noop. -enqueue-piggybacked-admitted from=25 to=5 +enqueue-piggybacked-admitted from=25 to=5 term=52 index=24 pri=0 ---- # Noop.