Skip to content

Commit

Permalink
replica_rac2: clean up admitted vector sending
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 11, 2024
1 parent 4410a15 commit fa1256b
Showing 1 changed file with 45 additions and 21 deletions.
66 changes: 45 additions & 21 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,34 +763,58 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
if !p.isLeaderUsingV2ProcLocked() {
return
}

if av, dirty := p.logTracker.admitted(true /* sched */); dirty {
if p.mu.leader.rc != nil {
// TODO(pav-kv): if we are the leader, send the admitted vector directly
// for local processing.
} else if p.mu.leaderNodeID != 0 && p.mu.term == av.Term {
// Otherwise, piggyback the new admitted vector to the message stream
// going to the known leader node. The leader accepts only admitted
// vectors in its term coordinate system.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.mu.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
}
}

p.maybeSendAdmittedRaftMuLockedProcLocked()
if p.mu.leader.rc != nil {
if err := p.mu.leader.rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
}
}

// maybeSendAdmittedRaftMuLockedProcLocked sends the admitted vector to the
// leader, if the vector was updated since the last send. If the replica is the
// leader, the sending is short-circuited to local processing.
func (p *processorImpl) maybeSendAdmittedRaftMuLockedProcLocked() {
// NB: this resets the scheduling bit in logTracker, which allows us to
// schedule this call again when the admitted vector is updated next time.
av, dirty := p.logTracker.admitted(true /* sched */)
// Don't send the admitted vector if it hasn't been updated since the last
// time it was sent.
if !dirty {
return
}
// The leader only accepts admitted vectors in its term coordinate system, so
// skip if the term does not match - the message is a no-op. Note that av.Term
// can only lag behind p.mu.term here, and never leads it. If we are the
// leader, it always matches.
if av.Term != p.mu.term {
return
}
if p.mu.leader.rc != nil {
// TODO(pav-kv): if we are the leader, send the admitted vector directly for
// local processing.
return
}
// If the leader is unknown, don't send the admitted vector to anyone. This
// should normally not happen here, since av.Term == p.mu.term means we had at
// least one append from the leader, so we know it. There are cases though
// (see Replica.forgetLeaderLocked) when raft deliberately forgets the leader.
if p.mu.leaderNodeID == 0 {
return
}
// Piggyback the new admitted vector to the message stream going to the node
// containing the leader replica.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.mu.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
}

func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) {
if len(e.Entries) == 0 {
return
Expand Down

0 comments on commit fa1256b

Please sign in to comment.