diff --git a/pkg/kv/kvserver/flow_control_raft_transport_test.go b/pkg/kv/kvserver/flow_control_raft_transport_test.go index b41113a7ee7c..9e70c3e7d182 100644 --- a/pkg/kv/kvserver/flow_control_raft_transport_test.go +++ b/pkg/kv/kvserver/flow_control_raft_transport_test.go @@ -796,6 +796,6 @@ func TestFlowControlRaftTransportV2(t *testing.T) { type noopPiggybackedAdmittedResponseScheduler struct{} func (s noopPiggybackedAdmittedResponseScheduler) ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange, + context.Context, []kvflowcontrolpb.PiggybackedAdmittedState, ) { } diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index 87c5081acb1c..5a8ddd27c2a8 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -291,7 +291,7 @@ type StoresForRACv2 interface { // processing. type PiggybackedAdmittedResponseScheduler interface { ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange) + ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState) } func MakeStoresForRACv2(stores *Stores) StoresForRACv2 { @@ -332,20 +332,20 @@ func (ss *storesForRACv2) lookup( // ScheduleAdmittedResponseForRangeRACv2 implements PiggybackedAdmittedResponseScheduler. func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2( - ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange, + ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState, ) { ls := (*Stores)(ss) for _, m := range msgs { - s, err := ls.GetStore(m.LeaderStoreID) + s, err := ls.GetStore(m.ToStoreID) if err != nil { - log.Errorf(ctx, "store %s not found", m.LeaderStoreID) + log.Errorf(ctx, "store %s not found", m.ToStoreID) continue } repl := s.GetReplicaIfExists(m.RangeID) - if repl == nil { + if repl == nil || repl.replicaID != m.ToReplicaID { continue } - repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.Msg) + repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.FromReplicaID, m.Admitted) s.scheduler.EnqueueRACv2PiggybackAdmitted(m.RangeID) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 3b05e2027912..62a2dbc8226e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -353,14 +353,13 @@ type Processor interface { AdmitRaftEntriesRaftMuLocked( ctx context.Context, event rac2.RaftEvent) bool - // EnqueuePiggybackedAdmittedAtLeader is called at the leader when - // receiving a piggybacked MsgAppResp that can advance a follower's - // admitted state. The caller is responsible for scheduling on the raft - // scheduler, such that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked - // gets called soon. - EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message) + // EnqueuePiggybackedAdmittedAtLeader is called at the leader when receiving a + // piggybacked admitted vector that can advance the given follower's admitted + // state. The caller is responsible for scheduling on the raft scheduler, such + // that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked gets called soon. + EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID, kvflowcontrolpb.AdmittedState) // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked is called to process - // previous enqueued piggybacked MsgAppResp. Returns true if + // previously enqueued piggybacked admitted vectors. Returns true if // HandleRaftReadyRaftMuLocked should be called. // // raftMu is held. @@ -390,6 +389,13 @@ type Processor interface { // AdmittedState returns the vector of admitted log indices. AdmittedState() rac2.AdmittedVector + // AdmitRaftMuLocked is called to notify RACv2 about the admitted vector + // update on the given peer replica. This releases the associated flow tokens + // if the replica is known and the admitted vector covers any. + // + // raftMu is held. + AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector) + // AdmitForEval is called to admit work that wants to evaluate at the // leaseholder. // @@ -429,7 +435,14 @@ type processorImpl struct { // State when leader, i.e., when leaderID == opts.ReplicaID, and v2 // protocol is enabled. leader struct { - enqueuedPiggybackedResponses map[roachpb.ReplicaID]raftpb.Message + // pendingAdmitted contains recently delivered admitted vectors. When this + // map is not empty, the range is scheduled for applying these vectors to + // the corresponding streams / token trackers. The map is cleared when the + // admitted vectors are applied. + // + // Invariant: rc == nil ==> len(pendingAdmitted) == 0. + // Invariant: len(pendingAdmitted) != 0 ==> the processing is scheduled. + pendingAdmitted map[roachpb.ReplicaID]rac2.AdmittedVector // Updating the rc reference requires both the enclosing mu and // rcReferenceUpdateMu. Code paths that want to access this // reference only need one of these mutexes. rcReferenceUpdateMu @@ -676,7 +689,7 @@ func (p *processorImpl) closeLeaderStateRaftMuLockedProcLocked(ctx context.Conte defer p.mu.leader.rcReferenceUpdateMu.Unlock() p.mu.leader.rc = nil }() - p.mu.leader.enqueuedPiggybackedResponses = nil + p.mu.leader.pendingAdmitted = nil p.mu.leader.term = 0 } @@ -700,7 +713,7 @@ func (p *processorImpl) createLeaderStateRaftMuLockedProcLocked( }) }() p.mu.leader.term = term - p.mu.leader.enqueuedPiggybackedResponses = map[roachpb.ReplicaID]raftpb.Message{} + p.mu.leader.pendingAdmitted = map[roachpb.ReplicaID]rac2.AdmittedVector{} } // HandleRaftReadyRaftMuLocked implements Processor. @@ -744,20 +757,23 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. return } - // Piggyback admitted index advancement, if any, to the message stream going - // to the leader node, if we are not the leader. At the leader node, the - // admitted vector is read directly from the log tracker. - if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 { - // TODO(pav-kv): must make sure the leader term is the same. - if admitted, dirty := p.logTracker.admitted(true /* sched */); dirty { + if av, dirty := p.logTracker.admitted(true /* sched */); dirty { + if rc := p.mu.leader.rc; rc != nil { + // If we are the leader, notify the RangeController about our replica's + // new admitted vector. + rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av) + } else if p.mu.leaderNodeID != 0 { + // If the leader is known, piggyback the updated admitted vector to the + // message stream going to the leader node. + // TODO(pav-kv): must make sure the leader term is the same. p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ RangeID: p.opts.RangeID, ToStoreID: p.mu.leaderStoreID, FromReplicaID: p.opts.ReplicaID, ToReplicaID: p.mu.leaderID, Admitted: kvflowcontrolpb.AdmittedState{ - Term: admitted.Term, - Admitted: admitted.Admitted[:], + Term: av.Term, + Admitted: av.Admitted[:], }}) } } @@ -864,18 +880,20 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2 } // EnqueuePiggybackedAdmittedAtLeader implements Processor. -func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message) { - if roachpb.ReplicaID(msg.To) != p.opts.ReplicaID { - // Ignore message to a stale ReplicaID. - return - } +func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader( + from roachpb.ReplicaID, state kvflowcontrolpb.AdmittedState, +) { p.mu.Lock() defer p.mu.Unlock() if p.mu.leader.rc == nil { return } - // Only need to keep the latest message from a replica. - p.mu.leader.enqueuedPiggybackedResponses[roachpb.ReplicaID(msg.From)] = msg + var admitted [raftpb.NumPriorities]uint64 + copy(admitted[:], state.Admitted) + // Merge in the received admitted vector. We are only interested in the + // highest admitted marks. Zero value always merges in favour of the new one. + p.mu.leader.pendingAdmitted[from] = p.mu.leader.pendingAdmitted[from].Merge( + rac2.AdmittedVector{Term: state.Term, Admitted: admitted}) } // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor. @@ -883,18 +901,13 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte p.opts.Replica.RaftMuAssertHeld() p.mu.Lock() defer p.mu.Unlock() - if p.mu.destroyed || len(p.mu.leader.enqueuedPiggybackedResponses) == 0 || p.raftMu.raftNode == nil { + if p.mu.destroyed || len(p.mu.leader.pendingAdmitted) == 0 { return false } - p.opts.Replica.MuLock() - defer p.opts.Replica.MuUnlock() - for k, m := range p.mu.leader.enqueuedPiggybackedResponses { - err := p.raftMu.raftNode.StepMsgAppRespForAdmittedLocked(m) - if err != nil { - log.Errorf(ctx, "%s", err) - } - delete(p.mu.leader.enqueuedPiggybackedResponses, k) + for replicaID, state := range p.mu.leader.pendingAdmitted { + p.mu.leader.rc.AdmitRaftMuLocked(ctx, replicaID, state) } + clear(p.mu.leader.pendingAdmitted) return true } @@ -954,6 +967,17 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector { return admitted } +// AdmitRaftMuLocked implements Processor. +func (p *processorImpl) AdmitRaftMuLocked( + ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector, +) { + p.opts.Replica.RaftMuAssertHeld() + // NB: rc is always updated while raftMu is held. + if rc := p.mu.leader.rc; rc != nil { + rc.AdmitRaftMuLocked(ctx, replicaID, av) + } +} + // AdmitForEval implements Processor. func (p *processorImpl) AdmitForEval( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 64820bd46d2f..55799e2d09db 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -155,10 +155,6 @@ func (rn *testRaftNode) print() { rn.term, rn.leader, rn.r.leaseholder, rn.mark, rn.nextUnstableIndex) } -func msgString(msg raftpb.Message) string { - return fmt.Sprintf("type: %s from: %d to: %d", msg.Type.String(), msg.From, msg.To) -} - type testAdmittedPiggybacker struct { b *strings.Builder } @@ -412,12 +408,9 @@ func TestProcessorBasic(t *testing.T) { var from, to uint64 d.ScanArgs(t, "from", &from) d.ScanArgs(t, "to", &to) - msg := raftpb.Message{ - Type: raftpb.MsgAppResp, - To: raftpb.PeerID(to), - From: raftpb.PeerID(from), - } - p.EnqueuePiggybackedAdmittedAtLeader(msg) + // TODO(pav-kv): parse the admitted vector. + p.EnqueuePiggybackedAdmittedAtLeader( + roachpb.ReplicaID(from), kvflowcontrolpb.AdmittedState{}) return builderStr() case "process-piggybacked-admitted": diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index cc2a59234f7c..1026d09d578b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -122,7 +122,7 @@ HandleRaftReady: AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:25} Priority:LowPri}}) = true destroyed-or-leader-using-v2: true -LogTracker [+dirty]: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23] +LogTracker: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23] LowPri: {Term:50 Index:25} # The leader has changed. @@ -463,6 +463,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -483,6 +484,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[28 28 28 28]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -494,9 +496,7 @@ enqueue-piggybacked-admitted from=25 to=5 process-piggybacked-admitted ---- Replica.RaftMuAssertHeld - Replica.MuLock - RaftNode.StepMsgAppRespForAdmittedLocked(type: MsgAppResp from: 25 to: 5) - Replica.MuUnlock + RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) # Noop. process-piggybacked-admitted @@ -511,6 +511,7 @@ enqueue-piggybacked-admitted from=25 to=6 process-piggybacked-admitted ---- Replica.RaftMuAssertHeld + RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]}) # We are still the leader, now at a new term. set-raft-state term=53 @@ -739,6 +740,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 50 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -761,7 +763,7 @@ HandleRaftReady: ..... AdmitRaftEntries: destroyed-or-leader-using-v2: true -LogTracker [+dirty]: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26] +LogTracker: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26] # Everything up to 27 is admitted. synced-log term=50 index=27 @@ -778,6 +780,7 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 50 Replica.MuUnlock + RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[27 27 27 27]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 5155aa8f8152..96e36310f5cd 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -534,14 +534,15 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs) t.kvflowControl.mu.Unlock() if len(batch.AdmittedStates) != 0 { - // TODO(pav-kv): dispatch admitted vectors to RACv2. + // Dispatch the admitted vectors to RACv2. // NB: we do this via this special path instead of using the // handleRaftRequest path since we don't have a full-fledged // RaftMessageRequest for each range (each of these responses could // be for a different range), and because what we need to do w.r.t. // queueing is much simpler (we don't need to worry about queue size - // since we only keep the latest message from each replica). - _ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2 + // since we only keep the highest admitted marks from each replica). + t.kvflowcontrol2.piggybackedResponseScheduler. + ScheduleAdmittedResponseForRangeRACv2(ctx, batch.AdmittedStates) } if len(batch.Requests) == 0 { continue diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 883325844cc8..1bc033f2d533 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -653,9 +653,12 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) } } case raftpb.MsgAppResp: - if req.AdmittedState.Term != 0 { - // TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached. - _ = 0 + // If there is an admitted vector annotation, pass it to RACv2 to release + // the flow control tokens. + if term := req.AdmittedState.Term; term != 0 { + av := rac2.AdmittedVector{Term: term} + copy(av.Admitted[:], req.AdmittedState.Admitted) + r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, av) } } err := raftGroup.Step(req.Message)