Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replica_rac2: clean up admitted vector sending #130461

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 84 additions & 51 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -409,6 +410,10 @@ type processorImpl struct {
// destroyed.
destroyed bool

// term is the current raft term. Kept up-to-date with the latest Ready
// cycle. It is used to notice transitions out of leadership and back, to
// recreate leader.rc.
term uint64
// leaderID is the ID of the current term leader. Can be zero if unknown.
leaderID roachpb.ReplicaID
// leaderNodeID and leaderStoreID are a function of leaderID and replicas
Expand Down Expand Up @@ -466,12 +471,6 @@ type processorImpl struct {
// rc is always updated while holding raftMu and rcReferenceUpdateMu. To
// access rc, the code must hold at least one of these mutexes.
rc rac2.RangeController
// term is used to notice transitions out of leadership and back, to
// recreate rc. It is set when rc is created, and is not up-to-date if there
// is no rc (which can happen when using the v1 protocol).
//
// TODO(pav-kv): move this next to leaderID, and always know the term.
term uint64
}

// replMu contains the fields that must be accessed while holding Replica.mu.
Expand Down Expand Up @@ -626,19 +625,25 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked(
nextUnstableIndex uint64,
leaderID roachpb.ReplicaID,
leaseholderID roachpb.ReplicaID,
myLeaderTerm uint64,
term uint64,
) {
if term < p.term {
log.Fatalf(ctx, "term regressed from %d to %d", p.term, term)
}
termChanged := term > p.term
if termChanged {
p.term = term
}
replicasChanged := p.desc.replicasChanged
if replicasChanged {
p.desc.replicasChanged = false
}
if !replicasChanged && leaderID == p.leaderID && leaseholderID == p.leaseholderID &&
(p.leader.rc == nil || p.leader.term == myLeaderTerm) {
(p.leader.rc == nil || !termChanged) {
// Common case.
return
}
// The leader or leaseholder or replicas or myLeaderTerm changed. We set
// everything.
// The leader or leaseholder or replicas or term changed. We set everything.
p.leaderID = leaderID
p.leaseholderID = leaseholderID
// Set leaderNodeID, leaderStoreID.
Expand All @@ -649,19 +654,16 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked(
rd, ok := p.desc.replicas[leaderID]
if !ok {
if leaderID == p.opts.ReplicaID {
// Is leader, but not in the set of replicas. We expect this
// should not be happening anymore, due to
// raft.Config.StepDownOnRemoval being set to true. But we
// tolerate it.
log.Errorf(ctx,
"leader=%d is not in the set of replicas=%v",
// Is leader, but not in the set of replicas. We expect this should not
// be happening anymore, due to raft.Config.StepDownOnRemoval being set
// to true. But we tolerate it.
log.Errorf(ctx, "leader=%d is not in the set of replicas=%v",
leaderID, p.desc.replicas)
p.leaderNodeID = p.opts.NodeID
p.leaderStoreID = p.opts.StoreID
} else {
// A follower, which can learn about a leader before it learns
// about a config change that includes the leader in the set
// of replicas, so ignore.
// A follower learns about a leader before it learns about a config
// change that includes the leader in the set of replicas. Ignore.
p.leaderNodeID = 0
p.leaderStoreID = 0
}
Expand All @@ -681,12 +683,12 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked(
if p.enabledWhenLeader == NotEnabledWhenLeader {
return
}
if p.leader.rc != nil && myLeaderTerm > p.leader.term {
if p.leader.rc != nil && termChanged {
// Need to recreate the RangeController.
p.closeLeaderStateRaftMuLocked(ctx)
}
if p.leader.rc == nil {
p.createLeaderStateRaftMuLocked(ctx, myLeaderTerm, nextUnstableIndex)
p.createLeaderStateRaftMuLocked(ctx, term, nextUnstableIndex)
return
}
// Existing RangeController.
Expand All @@ -703,7 +705,6 @@ func (p *processorImpl) closeLeaderStateRaftMuLocked(ctx context.Context) {
return
}
p.leader.rc.CloseRaftMuLocked(ctx)
p.leader.term = 0

func() {
p.leader.pendingAdmittedMu.Lock()
Expand All @@ -723,7 +724,7 @@ func (p *processorImpl) createLeaderStateRaftMuLocked(
if p.leader.rc != nil {
panic("RangeController already exists")
}
p.leader.term = term
p.term = term
rc := p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{
replicaSet: p.desc.replicas,
leaseholder: p.leaseholderID,
Expand Down Expand Up @@ -764,53 +765,85 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
// Grab the state we need in one shot after acquiring Replica mu.
var nextUnstableIndex uint64
var leaderID, leaseholderID roachpb.ReplicaID
var myLeaderTerm uint64
var term uint64
func() {
p.opts.Replica.MuLock()
defer p.opts.Replica.MuUnlock()
nextUnstableIndex = p.replMu.raftNode.NextUnstableIndexLocked()
leaderID = p.replMu.raftNode.LeaderLocked()
leaseholderID = p.opts.Replica.LeaseholderMuLocked()
if leaderID == p.opts.ReplicaID {
myLeaderTerm = p.replMu.raftNode.TermLocked()
}
term = p.replMu.raftNode.TermLocked()
}()
if len(e.Entries) > 0 {
nextUnstableIndex = e.Entries[0].Index
}
p.makeStateConsistentRaftMuLocked(
ctx, nextUnstableIndex, leaderID, leaseholderID, myLeaderTerm)
ctx, nextUnstableIndex, leaderID, leaseholderID, term)

if !p.isLeaderUsingV2ProcLocked() {
return
}

if av, dirty := p.logTracker.admitted(true /* sched */); dirty {
if rc := p.leader.rc; rc != nil {
// If we are the leader, notify the RangeController about our replica's
// new admitted vector.
rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av)
} else if p.leaderNodeID != 0 {
// If the leader is known, piggyback the updated admitted vector to the
// message stream going to the leader node.
// TODO(pav-kv): must make sure the leader term is the same.
p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
}
}

if p.leader.rc != nil {
if err := p.leader.rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
// maybeSendAdmittedRaftMuLocked sends the admitted vector to the leader, if the
// vector was updated since the last send. If the replica is the leader, the
// sending is short-circuited to local processing.
func (p *processorImpl) maybeSendAdmittedRaftMuLocked(ctx context.Context) {
// NB: this resets the scheduling bit in logTracker, which allows us to
// schedule this call again when the admitted vector is updated next time.
av, dirty := p.logTracker.admitted(true /* sched */)
// Don't send the admitted vector if it hasn't been updated since the last
// time it was sent.
if !dirty {
return
}
// If the admitted vector term is stale, don't send - the leader will drop it.
if av.Term < p.term {
return
}
// The admitted vector term can not outpace the raft term because raft would
// never accept a write that bumps its log's leader term without bumping the
// raft term first. We are holding raftMu between here and when the raft term
// was last read, and the logTracker term was last updated (we are still in
// the Ready handler), so this invariant could not be violated.
//
// However, we only check this in debug mode. There is no harm sending an
// admitted vector with a future term. This informs the previous leader that
// there is a new leader, so it has freedom to react to this information (e.g.
// release all flow tokens, or step down from leadership).
if buildutil.CrdbTestBuild && av.Term > p.term {
panic(errors.AssertionFailedf(
"admitted vector ahead of raft term: admitted=%+v, term=%d", av, p.term))
}
// If we are the leader, send the admitted vector directly to RangeController.
if rc := p.leader.rc; rc != nil {
rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av)
return
}
// If the leader is unknown, don't send the admitted vector to anyone. This
// should normally not happen here, since av.Term == p.term means we had at
// least one append from the leader, so we know it. There are cases though
// (see Replica.forgetLeaderLocked) when raft deliberately forgets the leader.
if p.leaderNodeID == 0 {
return
}
// Piggyback the new admitted vector to the message stream going to the node
// containing the leader replica.
p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: av.Term,
Admitted: av.Admitted[:],
}})
}

func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 25
RaftNode.LeaderLocked() = 10
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
.....
AdmitRaftEntries:
Expand All @@ -117,6 +118,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 26
RaftNode.LeaderLocked() = 10
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
.....
AdmitRaftEntries:
Expand Down Expand Up @@ -149,6 +151,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 26
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 25 25 25])
.....
Expand All @@ -172,6 +175,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 27
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
.....
AdmitRaftEntries:
Expand All @@ -190,6 +194,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 27
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
.....

Expand All @@ -212,6 +217,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 27
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 26 25 26])
.....
Expand All @@ -233,6 +239,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 27
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 25 26])
.....
Expand All @@ -256,6 +263,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
.....
AdmitRaftEntries:
Expand Down Expand Up @@ -285,6 +293,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 26 26])
.....
Expand Down Expand Up @@ -317,6 +326,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 51
Replica.MuUnlock
.....

Expand All @@ -330,6 +340,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 51
Replica.MuUnlock
.....
AdmitRaftEntries:
Expand All @@ -355,6 +366,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 11
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 51
Replica.MuUnlock
Piggybacker.Add(n11, [r3,s11,5->11] admitted=t51/[26 26 26 26])
.....
Expand Down Expand Up @@ -791,6 +803,7 @@ HandleRaftReady:
RaftNode.NextUnstableIndexLocked() = 28
RaftNode.LeaderLocked() = 0
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 51
Replica.MuUnlock
RangeController.CloseRaftMuLocked
.....
Loading