From 930d46344f5ecff0e665eb50495b2714799dc3f9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 6 Sep 2024 19:54:39 +0100 Subject: [PATCH 1/2] replica_rac2: move RaftNode from kvserver Epic: none Release note: none --- pkg/kv/kvserver/BUILD.bazel | 1 - pkg/kv/kvserver/flow_control_replica_integration.go | 2 +- pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel | 3 +++ .../replica_rac2/raft_node.go} | 8 +++++--- 4 files changed, 9 insertions(+), 5 deletions(-) rename pkg/kv/kvserver/{flow_control_raft.go => kvflowcontrol/replica_rac2/raft_node.go} (91%) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8ba2b2009d7d..4e11330871ee 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "debug_print.go", "doc.go", "flow_control_integration.go", - "flow_control_raft.go", "flow_control_raft_transport.go", "flow_control_replica.go", "flow_control_replica_integration.go", diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go index 95a7dc496ace..6b04aa6cf86a 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration.go +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -484,7 +484,7 @@ func (r *replicaForRACv2) MuUnlock() { // RaftNodeMuLocked implements replica_rac2.Replica. func (r *replicaForRACv2) RaftNodeMuLocked() replica_rac2.RaftNode { - return raftNodeForRACv2{RawNode: r.mu.internalRaftGroup} + return replica_rac2.NewRaftNode(r.mu.internalRaftGroup) } // LeaseholderMuLocked implements replica_rac2.Replica. diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index 15d56c79fe33..36b75457fca2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "close_scheduler.go", "doc.go", "processor.go", + "raft_node.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2", visibility = ["//visibility:public"], @@ -15,7 +16,9 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/raftlog", + "//pkg/raft", "//pkg/raft/raftpb", + "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", diff --git a/pkg/kv/kvserver/flow_control_raft.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go similarity index 91% rename from pkg/kv/kvserver/flow_control_raft.go rename to pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index 9efb306537e2..5ccd96cacf76 100644 --- a/pkg/kv/kvserver/flow_control_raft.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -8,11 +8,10 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvserver +package replica_rac2 import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" @@ -23,7 +22,10 @@ type raftNodeForRACv2 struct { *raft.RawNode } -var _ replica_rac2.RaftNode = raftNodeForRACv2{} +// NewRaftNode creates a RaftNode implementation from the given RawNode. +func NewRaftNode(rn *raft.RawNode) RaftNode { + return raftNodeForRACv2{RawNode: rn} +} func (rn raftNodeForRACv2) EnablePingForAdmittedLaggingLocked() { panic("TODO(pav-kv): implement") From 44155ccf682b471487c73f72db453c9ca274041b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 27 Aug 2024 17:21:06 +0100 Subject: [PATCH 2/2] kvserver,rac2: initialize RaftNode early This commit moves the RaftNode interface initialization to the point when raft.RawNode is created. This will be needed for correct initialization of LogTracker from the initial raft state. Epic: none Release note: none --- .../flow_control_replica_integration.go | 5 -- .../kvflowcontrol/replica_rac2/processor.go | 52 +++++++++++-------- .../replica_rac2/processor_test.go | 22 ++++---- .../replica_rac2/testdata/processor | 20 ++++++- pkg/kv/kvserver/replica_init.go | 1 + 5 files changed, 57 insertions(+), 43 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go index 6b04aa6cf86a..bb0d7b46f504 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration.go +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -482,11 +482,6 @@ func (r *replicaForRACv2) MuUnlock() { r.mu.Unlock() } -// RaftNodeMuLocked implements replica_rac2.Replica. -func (r *replicaForRACv2) RaftNodeMuLocked() replica_rac2.RaftNode { - return replica_rac2.NewRaftNode(r.mu.internalRaftGroup) -} - // LeaseholderMuLocked implements replica_rac2.Replica. func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID { return r.mu.state.Lease.Replica.ReplicaID diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index a34c2cb41afe..17739c89302d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -42,12 +41,6 @@ type Replica interface { MuLock() // MuUnlock releases Replica.mu. MuUnlock() - // RaftNodeMuLocked returns a reference to the RaftNode. It is only called - // after Processor knows the Replica is initialized. - // - // At least Replica mu is held. The caller does not make any claims about - // whether it holds raftMu or not. - RaftNodeMuLocked() RaftNode // LeaseholderMuLocked returns the Replica's current knowledge of the // leaseholder, which can be stale. It is only called after Processor // knows the Replica is initialized. @@ -304,6 +297,12 @@ type SideChannelInfoUsingRaftMessageRequest struct { // NotEnabledWhenLeader, acquire Replica.mu and close // replicaFlowControlIntegrationImpl (RACv1). type Processor interface { + // InitRaftLocked is called when RaftNode is initialized for the Replica. + // NB: can be called twice before the Replica is fully initialized. + // + // Both Replica mu and raftMu are held. + InitRaftLocked(context.Context, RaftNode) + // OnDestroyRaftMuLocked is called when the Replica is being destroyed. // // We need to know when Replica.mu.destroyStatus is updated, so that we @@ -503,6 +502,17 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool { return p.mu.leader.rc != nil || p.mu.follower.isLeaderUsingV2Protocol } +// InitRaftLocked implements Processor. +func (p *processorImpl) InitRaftLocked(ctx context.Context, rn RaftNode) { + p.opts.Replica.RaftMuAssertHeld() + p.opts.Replica.MuAssertHeld() + if p.raftMu.replicas != nil { + log.Fatalf(ctx, "initializing RaftNode after replica is initialized") + } + p.raftMu.raftNode = rn + // TODO(pav-kv): initialize the LogTracker from RaftNode state. +} + // OnDestroyRaftMuLocked implements Processor. func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { p.opts.Replica.RaftMuAssertHeld() @@ -569,18 +579,17 @@ func (p *processorImpl) OnDescChangedLocked( ) { p.opts.Replica.RaftMuAssertHeld() p.opts.Replica.MuAssertHeld() - if p.raftMu.replicas == nil { - // Replica is initialized, in that we have a descriptor. Get the - // RaftNode. - p.raftMu.raftNode = p.opts.Replica.RaftNodeMuLocked() - p.raftMu.tenantID = tenantID - } else { - if p.raftMu.tenantID != tenantID { - panic(errors.AssertionFailedf("tenantId was changed from %s to %s", - p.raftMu.tenantID, tenantID)) + initialization := p.raftMu.replicas == nil + if initialization { + // Replica is initialized, in that we now have a descriptor. + if p.raftMu.raftNode == nil { + panic(errors.AssertionFailedf("RaftNode is not initialized")) } + p.raftMu.tenantID = tenantID + } else if p.raftMu.tenantID != tenantID { + panic(errors.AssertionFailedf("tenantId was changed from %s to %s", + p.raftMu.tenantID, tenantID)) } - initialization := p.raftMu.replicas == nil p.raftMu.replicas = descToReplicaSet(desc) p.raftMu.replicasChanged = true // We need to promptly return tokens if some replicas have been removed, @@ -718,15 +727,12 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. p.opts.Replica.RaftMuAssertHeld() p.mu.Lock() defer p.mu.Unlock() - if p.mu.destroyed { + // Skip if the replica is not initialized or already destroyed. + if p.raftMu.replicas == nil || p.mu.destroyed { return } if p.raftMu.raftNode == nil { - if buildutil.CrdbTestBuild { - if len(e.Entries) > 0 { - panic(errors.AssertionFailedf("entries provided without raft node")) - } - } + log.Fatal(ctx, "RaftNode is not initialized") return } // NB: we need to call makeStateConsistentRaftMuLockedProcLocked even if diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index ad83e496263c..c327a294221f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -49,14 +49,7 @@ type testReplica struct { var _ Replica = &testReplica{} func newTestReplica(b *strings.Builder) *testReplica { - r := &testReplica{ - b: b, - } - r.raftNode = &testRaftNode{ - b: b, - r: r, - } - return r + return &testReplica{b: b} } func (r *testReplica) RaftMuAssertHeld() { @@ -77,11 +70,6 @@ func (r *testReplica) MuUnlock() { r.mu.Unlock() } -func (r *testReplica) RaftNodeMuLocked() RaftNode { - fmt.Fprintf(r.b, " Replica.RaftNodeMuLocked\n") - return r.raftNode -} - func (r *testReplica) LeaseholderMuLocked() roachpb.ReplicaID { fmt.Fprintf(r.b, " Replica.LeaseholderMuLocked\n") r.mu.AssertHeld() @@ -332,6 +320,14 @@ func TestProcessorBasic(t *testing.T) { reset(enabledLevel) return builderStr() + case "init-raft": + var mark rac2.LogMark + d.ScanArgs(t, "log-term", &mark.Term) + d.ScanArgs(t, "log-index", &mark.Index) + r.raftNode = &testRaftNode{b: &b, r: r, term: mark.Term, stableIndex: mark.Index} + p.InitRaftLocked(ctx, r.raftNode) + return builderStr() + case "set-raft-state": if d.HasArg("admitted") { var arg string diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 3622bc89aa1b..75d61a722de5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -24,6 +24,13 @@ admit-for-eval pri=normal-pri ---- admitted: false err: +# No-op since RaftNode is not initialized. +handle-raft-ready-and-admit +---- +HandleRaftReady: + Replica.RaftMuAssertHeld +..... + reset ---- n1,s2,r3: replica=5, tenant=4, enabled-level=not-enabled @@ -41,6 +48,12 @@ get-enabled-level ---- enabled-level: v1-encoding +# TODO(pav-kv): initialize to a correct state. +init-raft log-term=0 log-index=20 +---- + Replica.RaftMuAssertHeld + Replica.MuAssertHeld + # Since stable-index is 20, admitted is slightly behind. The leader and # leaseholder are both on replica-id 10. set-raft-state leader=10 stable-index=20 next-unstable-index=25 leaseholder=10 admitted=[15,20,15,20] @@ -60,7 +73,6 @@ on-desc-changed replicas=n11/s11/11 ---- Replica.RaftMuAssertHeld Replica.MuAssertHeld - Replica.RaftNodeMuLocked # handleRaftReady. It thinks the leader is using v1, so there is no # advancement of admitted, or submission of this entry for admission. @@ -604,6 +616,11 @@ get-enabled-level ---- enabled-level: not-enabled +init-raft log-term=0 log-index=20 +---- + Replica.RaftMuAssertHeld + Replica.MuAssertHeld + set-raft-state leader=5 my-leader-term=50 leaseholder=5 stable-index=20 next-unstable-index=25 admitted=[15,20,15,20] ---- Raft: leader: 5 leaseholder: 5 stable: 20 next-unstable: 25 term: 50 admitted: [15, 20, 15, 20] @@ -620,7 +637,6 @@ on-desc-changed replicas=n11/s11/11,n13/s13/13 ---- Replica.RaftMuAssertHeld Replica.MuAssertHeld - Replica.RaftNodeMuLocked handle-raft-ready-and-admit ---- diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index b5ac65931fc1..4fc8cf52224e 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -320,6 +320,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg)) return nil }