Skip to content

Commit

Permalink
replica_rac2: clean up admitted vector sending
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 12, 2024
1 parent b3963fc commit 1d00c8f
Showing 1 changed file with 46 additions and 23 deletions.
69 changes: 46 additions & 23 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,33 +782,56 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
if !p.isLeaderUsingV2ProcLocked() {
return
}

if av, dirty := p.logTracker.admitted(true /* sched */); dirty {
if rc := p.leader.rc; rc != nil {
// If we are the leader, notify the RangeController about our replica's
// new admitted vector.
rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av)
} else if p.leaderNodeID != 0 {
// If the leader is known, piggyback the updated admitted vector to the
// message stream going to the leader node.
// TODO(pav-kv): must make sure the leader term is the same.
p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
}
}

if p.leader.rc != nil {
if err := p.leader.rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
// maybeSendAdmittedRaftMuLocked sends the admitted vector to the leader, if the
// vector was updated since the last send. If the replica is the leader, the
// sending is short-circuited to local processing.
func (p *processorImpl) maybeSendAdmittedRaftMuLocked(ctx context.Context) {
// NB: this resets the scheduling bit in logTracker, which allows us to
// schedule this call again when the admitted vector is updated next time.
av, dirty := p.logTracker.admitted(true /* sched */)
// Don't send the admitted vector if it hasn't been updated since the last
// time it was sent.
if !dirty {
return
}
// The leader only accepts admitted vectors in its term coordinate system, so
// skip if the term does not match - the message is a no-op. Note that av.Term
// can only lag behind p.mu.term here, and never leads it. If we are the
// leader, it always matches.
if av.Term != p.term {
return
}
// If we are the leader, send the admitted vector directly to RangeController.
if rc := p.leader.rc; rc != nil {
rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av)
return
}
// If the leader is unknown, don't send the admitted vector to anyone. This
// should normally not happen here, since av.Term == p.mu.term means we had at
// least one append from the leader, so we know it. There are cases though
// (see Replica.forgetLeaderLocked) when raft deliberately forgets the leader.
if p.leaderNodeID == 0 {
return
}
// Piggyback the new admitted vector to the message stream going to the node
// containing the leader replica.
p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
}

func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) {
Expand Down

0 comments on commit 1d00c8f

Please sign in to comment.