diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8ba2b2009d7d..4e11330871ee 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "debug_print.go", "doc.go", "flow_control_integration.go", - "flow_control_raft.go", "flow_control_raft_transport.go", "flow_control_replica.go", "flow_control_replica_integration.go", diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go index 95a7dc496ace..bb0d7b46f504 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration.go +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -482,11 +482,6 @@ func (r *replicaForRACv2) MuUnlock() { r.mu.Unlock() } -// RaftNodeMuLocked implements replica_rac2.Replica. -func (r *replicaForRACv2) RaftNodeMuLocked() replica_rac2.RaftNode { - return raftNodeForRACv2{RawNode: r.mu.internalRaftGroup} -} - // LeaseholderMuLocked implements replica_rac2.Replica. func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID { return r.mu.state.Lease.Replica.ReplicaID diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index 15d56c79fe33..36b75457fca2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "close_scheduler.go", "doc.go", "processor.go", + "raft_node.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2", visibility = ["//visibility:public"], @@ -15,7 +16,9 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/raftlog", + "//pkg/raft", "//pkg/raft/raftpb", + "//pkg/raft/tracker", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index a34c2cb41afe..17739c89302d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -42,12 +41,6 @@ type Replica interface { MuLock() // MuUnlock releases Replica.mu. MuUnlock() - // RaftNodeMuLocked returns a reference to the RaftNode. It is only called - // after Processor knows the Replica is initialized. - // - // At least Replica mu is held. The caller does not make any claims about - // whether it holds raftMu or not. - RaftNodeMuLocked() RaftNode // LeaseholderMuLocked returns the Replica's current knowledge of the // leaseholder, which can be stale. It is only called after Processor // knows the Replica is initialized. @@ -304,6 +297,12 @@ type SideChannelInfoUsingRaftMessageRequest struct { // NotEnabledWhenLeader, acquire Replica.mu and close // replicaFlowControlIntegrationImpl (RACv1). type Processor interface { + // InitRaftLocked is called when RaftNode is initialized for the Replica. + // NB: can be called twice before the Replica is fully initialized. + // + // Both Replica mu and raftMu are held. + InitRaftLocked(context.Context, RaftNode) + // OnDestroyRaftMuLocked is called when the Replica is being destroyed. // // We need to know when Replica.mu.destroyStatus is updated, so that we @@ -503,6 +502,17 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool { return p.mu.leader.rc != nil || p.mu.follower.isLeaderUsingV2Protocol } +// InitRaftLocked implements Processor. +func (p *processorImpl) InitRaftLocked(ctx context.Context, rn RaftNode) { + p.opts.Replica.RaftMuAssertHeld() + p.opts.Replica.MuAssertHeld() + if p.raftMu.replicas != nil { + log.Fatalf(ctx, "initializing RaftNode after replica is initialized") + } + p.raftMu.raftNode = rn + // TODO(pav-kv): initialize the LogTracker from RaftNode state. +} + // OnDestroyRaftMuLocked implements Processor. func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) { p.opts.Replica.RaftMuAssertHeld() @@ -569,18 +579,17 @@ func (p *processorImpl) OnDescChangedLocked( ) { p.opts.Replica.RaftMuAssertHeld() p.opts.Replica.MuAssertHeld() - if p.raftMu.replicas == nil { - // Replica is initialized, in that we have a descriptor. Get the - // RaftNode. - p.raftMu.raftNode = p.opts.Replica.RaftNodeMuLocked() - p.raftMu.tenantID = tenantID - } else { - if p.raftMu.tenantID != tenantID { - panic(errors.AssertionFailedf("tenantId was changed from %s to %s", - p.raftMu.tenantID, tenantID)) + initialization := p.raftMu.replicas == nil + if initialization { + // Replica is initialized, in that we now have a descriptor. + if p.raftMu.raftNode == nil { + panic(errors.AssertionFailedf("RaftNode is not initialized")) } + p.raftMu.tenantID = tenantID + } else if p.raftMu.tenantID != tenantID { + panic(errors.AssertionFailedf("tenantId was changed from %s to %s", + p.raftMu.tenantID, tenantID)) } - initialization := p.raftMu.replicas == nil p.raftMu.replicas = descToReplicaSet(desc) p.raftMu.replicasChanged = true // We need to promptly return tokens if some replicas have been removed, @@ -718,15 +727,12 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. p.opts.Replica.RaftMuAssertHeld() p.mu.Lock() defer p.mu.Unlock() - if p.mu.destroyed { + // Skip if the replica is not initialized or already destroyed. + if p.raftMu.replicas == nil || p.mu.destroyed { return } if p.raftMu.raftNode == nil { - if buildutil.CrdbTestBuild { - if len(e.Entries) > 0 { - panic(errors.AssertionFailedf("entries provided without raft node")) - } - } + log.Fatal(ctx, "RaftNode is not initialized") return } // NB: we need to call makeStateConsistentRaftMuLockedProcLocked even if diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index ad83e496263c..c327a294221f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -49,14 +49,7 @@ type testReplica struct { var _ Replica = &testReplica{} func newTestReplica(b *strings.Builder) *testReplica { - r := &testReplica{ - b: b, - } - r.raftNode = &testRaftNode{ - b: b, - r: r, - } - return r + return &testReplica{b: b} } func (r *testReplica) RaftMuAssertHeld() { @@ -77,11 +70,6 @@ func (r *testReplica) MuUnlock() { r.mu.Unlock() } -func (r *testReplica) RaftNodeMuLocked() RaftNode { - fmt.Fprintf(r.b, " Replica.RaftNodeMuLocked\n") - return r.raftNode -} - func (r *testReplica) LeaseholderMuLocked() roachpb.ReplicaID { fmt.Fprintf(r.b, " Replica.LeaseholderMuLocked\n") r.mu.AssertHeld() @@ -332,6 +320,14 @@ func TestProcessorBasic(t *testing.T) { reset(enabledLevel) return builderStr() + case "init-raft": + var mark rac2.LogMark + d.ScanArgs(t, "log-term", &mark.Term) + d.ScanArgs(t, "log-index", &mark.Index) + r.raftNode = &testRaftNode{b: &b, r: r, term: mark.Term, stableIndex: mark.Index} + p.InitRaftLocked(ctx, r.raftNode) + return builderStr() + case "set-raft-state": if d.HasArg("admitted") { var arg string diff --git a/pkg/kv/kvserver/flow_control_raft.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go similarity index 91% rename from pkg/kv/kvserver/flow_control_raft.go rename to pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go index 9efb306537e2..5ccd96cacf76 100644 --- a/pkg/kv/kvserver/flow_control_raft.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go @@ -8,11 +8,10 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvserver +package replica_rac2 import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" @@ -23,7 +22,10 @@ type raftNodeForRACv2 struct { *raft.RawNode } -var _ replica_rac2.RaftNode = raftNodeForRACv2{} +// NewRaftNode creates a RaftNode implementation from the given RawNode. +func NewRaftNode(rn *raft.RawNode) RaftNode { + return raftNodeForRACv2{RawNode: rn} +} func (rn raftNodeForRACv2) EnablePingForAdmittedLaggingLocked() { panic("TODO(pav-kv): implement") diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 3622bc89aa1b..75d61a722de5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -24,6 +24,13 @@ admit-for-eval pri=normal-pri ---- admitted: false err: +# No-op since RaftNode is not initialized. +handle-raft-ready-and-admit +---- +HandleRaftReady: + Replica.RaftMuAssertHeld +..... + reset ---- n1,s2,r3: replica=5, tenant=4, enabled-level=not-enabled @@ -41,6 +48,12 @@ get-enabled-level ---- enabled-level: v1-encoding +# TODO(pav-kv): initialize to a correct state. +init-raft log-term=0 log-index=20 +---- + Replica.RaftMuAssertHeld + Replica.MuAssertHeld + # Since stable-index is 20, admitted is slightly behind. The leader and # leaseholder are both on replica-id 10. set-raft-state leader=10 stable-index=20 next-unstable-index=25 leaseholder=10 admitted=[15,20,15,20] @@ -60,7 +73,6 @@ on-desc-changed replicas=n11/s11/11 ---- Replica.RaftMuAssertHeld Replica.MuAssertHeld - Replica.RaftNodeMuLocked # handleRaftReady. It thinks the leader is using v1, so there is no # advancement of admitted, or submission of this entry for admission. @@ -604,6 +616,11 @@ get-enabled-level ---- enabled-level: not-enabled +init-raft log-term=0 log-index=20 +---- + Replica.RaftMuAssertHeld + Replica.MuAssertHeld + set-raft-state leader=5 my-leader-term=50 leaseholder=5 stable-index=20 next-unstable-index=25 admitted=[15,20,15,20] ---- Raft: leader: 5 leaseholder: 5 stable: 20 next-unstable: 25 term: 50 admitted: [15, 20, 15, 20] @@ -620,7 +637,6 @@ on-desc-changed replicas=n11/s11/11,n13/s13/13 ---- Replica.RaftMuAssertHeld Replica.MuAssertHeld - Replica.RaftNodeMuLocked handle-raft-ready-and-admit ---- diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index b5ac65931fc1..4fc8cf52224e 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -320,6 +320,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg)) return nil }