From b3963fc8b059a136fdf710294e20486535bd26a7 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 11 Sep 2024 04:04:21 +0100 Subject: [PATCH 1/2] replica_rac2: track raft term in processorImpl Epic: none Release note: none --- .../kvflowcontrol/replica_rac2/processor.go | 54 +++++++++---------- .../replica_rac2/testdata/processor | 13 +++++ 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 3c76ddbb5587..997217b9d6c8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -409,6 +409,10 @@ type processorImpl struct { // destroyed. destroyed bool + // term is the current raft term. Kept up-to-date with the latest Ready + // cycle. It is used to notice transitions out of leadership and back, to + // recreate leader.rc. + term uint64 // leaderID is the ID of the current term leader. Can be zero if unknown. leaderID roachpb.ReplicaID // leaderNodeID and leaderStoreID are a function of leaderID and replicas @@ -466,12 +470,6 @@ type processorImpl struct { // rc is always updated while holding raftMu and rcReferenceUpdateMu. To // access rc, the code must hold at least one of these mutexes. rc rac2.RangeController - // term is used to notice transitions out of leadership and back, to - // recreate rc. It is set when rc is created, and is not up-to-date if there - // is no rc (which can happen when using the v1 protocol). - // - // TODO(pav-kv): move this next to leaderID, and always know the term. - term uint64 } // replMu contains the fields that must be accessed while holding Replica.mu. @@ -626,19 +624,25 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked( nextUnstableIndex uint64, leaderID roachpb.ReplicaID, leaseholderID roachpb.ReplicaID, - myLeaderTerm uint64, + term uint64, ) { + if term < p.term { + log.Fatalf(ctx, "term regressed from %d to %d", p.term, term) + } + termChanged := term > p.term + if termChanged { + p.term = term + } replicasChanged := p.desc.replicasChanged if replicasChanged { p.desc.replicasChanged = false } if !replicasChanged && leaderID == p.leaderID && leaseholderID == p.leaseholderID && - (p.leader.rc == nil || p.leader.term == myLeaderTerm) { + (p.leader.rc == nil || !termChanged) { // Common case. return } - // The leader or leaseholder or replicas or myLeaderTerm changed. We set - // everything. + // The leader or leaseholder or replicas or term changed. We set everything. p.leaderID = leaderID p.leaseholderID = leaseholderID // Set leaderNodeID, leaderStoreID. @@ -649,19 +653,16 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked( rd, ok := p.desc.replicas[leaderID] if !ok { if leaderID == p.opts.ReplicaID { - // Is leader, but not in the set of replicas. We expect this - // should not be happening anymore, due to - // raft.Config.StepDownOnRemoval being set to true. But we - // tolerate it. - log.Errorf(ctx, - "leader=%d is not in the set of replicas=%v", + // Is leader, but not in the set of replicas. We expect this should not + // be happening anymore, due to raft.Config.StepDownOnRemoval being set + // to true. But we tolerate it. + log.Errorf(ctx, "leader=%d is not in the set of replicas=%v", leaderID, p.desc.replicas) p.leaderNodeID = p.opts.NodeID p.leaderStoreID = p.opts.StoreID } else { - // A follower, which can learn about a leader before it learns - // about a config change that includes the leader in the set - // of replicas, so ignore. + // A follower learns about a leader before it learns about a config + // change that includes the leader in the set of replicas. Ignore. p.leaderNodeID = 0 p.leaderStoreID = 0 } @@ -681,12 +682,12 @@ func (p *processorImpl) makeStateConsistentRaftMuLocked( if p.enabledWhenLeader == NotEnabledWhenLeader { return } - if p.leader.rc != nil && myLeaderTerm > p.leader.term { + if p.leader.rc != nil && termChanged { // Need to recreate the RangeController. p.closeLeaderStateRaftMuLocked(ctx) } if p.leader.rc == nil { - p.createLeaderStateRaftMuLocked(ctx, myLeaderTerm, nextUnstableIndex) + p.createLeaderStateRaftMuLocked(ctx, term, nextUnstableIndex) return } // Existing RangeController. @@ -703,7 +704,6 @@ func (p *processorImpl) closeLeaderStateRaftMuLocked(ctx context.Context) { return } p.leader.rc.CloseRaftMuLocked(ctx) - p.leader.term = 0 func() { p.leader.pendingAdmittedMu.Lock() @@ -723,7 +723,7 @@ func (p *processorImpl) createLeaderStateRaftMuLocked( if p.leader.rc != nil { panic("RangeController already exists") } - p.leader.term = term + p.term = term rc := p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{ replicaSet: p.desc.replicas, leaseholder: p.leaseholderID, @@ -764,22 +764,20 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. // Grab the state we need in one shot after acquiring Replica mu. var nextUnstableIndex uint64 var leaderID, leaseholderID roachpb.ReplicaID - var myLeaderTerm uint64 + var term uint64 func() { p.opts.Replica.MuLock() defer p.opts.Replica.MuUnlock() nextUnstableIndex = p.replMu.raftNode.NextUnstableIndexLocked() leaderID = p.replMu.raftNode.LeaderLocked() leaseholderID = p.opts.Replica.LeaseholderMuLocked() - if leaderID == p.opts.ReplicaID { - myLeaderTerm = p.replMu.raftNode.TermLocked() - } + term = p.replMu.raftNode.TermLocked() }() if len(e.Entries) > 0 { nextUnstableIndex = e.Entries[0].Index } p.makeStateConsistentRaftMuLocked( - ctx, nextUnstableIndex, leaderID, leaseholderID, myLeaderTerm) + ctx, nextUnstableIndex, leaderID, leaseholderID, term) if !p.isLeaderUsingV2ProcLocked() { return diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index f0cc98aa3fe2..d8507aa5971e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -91,6 +91,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 25 RaftNode.LeaderLocked() = 10 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock ..... AdmitRaftEntries: @@ -117,6 +118,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 26 RaftNode.LeaderLocked() = 10 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock ..... AdmitRaftEntries: @@ -149,6 +151,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 26 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 25 25 25]) ..... @@ -172,6 +175,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 27 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock ..... AdmitRaftEntries: @@ -190,6 +194,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 27 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock ..... @@ -212,6 +217,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 27 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[24 26 25 26]) ..... @@ -233,6 +239,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 27 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 25 26]) ..... @@ -256,6 +263,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock ..... AdmitRaftEntries: @@ -285,6 +293,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 50 Replica.MuUnlock Piggybacker.Add(n11, [r3,s11,5->11] admitted=t50/[26 26 26 26]) ..... @@ -317,6 +326,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 51 Replica.MuUnlock ..... @@ -330,6 +340,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 51 Replica.MuUnlock ..... AdmitRaftEntries: @@ -355,6 +366,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 11 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 51 Replica.MuUnlock Piggybacker.Add(n11, [r3,s11,5->11] admitted=t51/[26 26 26 26]) ..... @@ -791,6 +803,7 @@ HandleRaftReady: RaftNode.NextUnstableIndexLocked() = 28 RaftNode.LeaderLocked() = 0 Replica.LeaseholderMuLocked + RaftNode.TermLocked() = 51 Replica.MuUnlock RangeController.CloseRaftMuLocked ..... From fb69429836e52f0be791823d9c9168c68bf593e7 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 11 Sep 2024 04:50:18 +0100 Subject: [PATCH 2/2] replica_rac2: clean up admitted vector sending Epic: none Release note: none --- .../kvflowcontrol/replica_rac2/processor.go | 81 +++++++++++++------ 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 997217b9d6c8..1aad44e77b0d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -782,33 +783,67 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. if !p.isLeaderUsingV2ProcLocked() { return } - - if av, dirty := p.logTracker.admitted(true /* sched */); dirty { - if rc := p.leader.rc; rc != nil { - // If we are the leader, notify the RangeController about our replica's - // new admitted vector. - rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av) - } else if p.leaderNodeID != 0 { - // If the leader is known, piggyback the updated admitted vector to the - // message stream going to the leader node. - // TODO(pav-kv): must make sure the leader term is the same. - p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ - RangeID: p.opts.RangeID, - ToStoreID: p.leaderStoreID, - FromReplicaID: p.opts.ReplicaID, - ToReplicaID: p.leaderID, - Admitted: kvflowcontrolpb.AdmittedState{ - Term: av.Term, - Admitted: av.Admitted[:], - }}) + p.maybeSendAdmittedRaftMuLocked(ctx) + if rc := p.leader.rc; rc != nil { + if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { + log.Errorf(ctx, "error handling raft event: %v", err) } } +} - if p.leader.rc != nil { - if err := p.leader.rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { - log.Errorf(ctx, "error handling raft event: %v", err) - } +// maybeSendAdmittedRaftMuLocked sends the admitted vector to the leader, if the +// vector was updated since the last send. If the replica is the leader, the +// sending is short-circuited to local processing. +func (p *processorImpl) maybeSendAdmittedRaftMuLocked(ctx context.Context) { + // NB: this resets the scheduling bit in logTracker, which allows us to + // schedule this call again when the admitted vector is updated next time. + av, dirty := p.logTracker.admitted(true /* sched */) + // Don't send the admitted vector if it hasn't been updated since the last + // time it was sent. + if !dirty { + return + } + // If the admitted vector term is stale, don't send - the leader will drop it. + if av.Term < p.term { + return + } + // The admitted vector term can not outpace the raft term because raft would + // never accept a write that bumps its log's leader term without bumping the + // raft term first. We are holding raftMu between here and when the raft term + // was last read, and the logTracker term was last updated (we are still in + // the Ready handler), so this invariant could not be violated. + // + // However, we only check this in debug mode. There is no harm sending an + // admitted vector with a future term. This informs the previous leader that + // there is a new leader, so it has freedom to react to this information (e.g. + // release all flow tokens, or step down from leadership). + if buildutil.CrdbTestBuild && av.Term > p.term { + panic(errors.AssertionFailedf( + "admitted vector ahead of raft term: admitted=%+v, term=%d", av, p.term)) + } + // If we are the leader, send the admitted vector directly to RangeController. + if rc := p.leader.rc; rc != nil { + rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av) + return + } + // If the leader is unknown, don't send the admitted vector to anyone. This + // should normally not happen here, since av.Term == p.term means we had at + // least one append from the leader, so we know it. There are cases though + // (see Replica.forgetLeaderLocked) when raft deliberately forgets the leader. + if p.leaderNodeID == 0 { + return } + // Piggyback the new admitted vector to the message stream going to the node + // containing the leader replica. + p.opts.AdmittedPiggybacker.Add(p.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{ + RangeID: p.opts.RangeID, + ToStoreID: p.leaderStoreID, + FromReplicaID: p.opts.ReplicaID, + ToReplicaID: p.leaderID, + Admitted: kvflowcontrolpb.AdmittedState{ + Term: av.Term, + Admitted: av.Admitted[:], + }}) } func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) {