Skip to content

Commit

Permalink
Merge #129727
Browse files Browse the repository at this point in the history
129727: kvserver,rac2: initialize RaftNode early r=sumeerbhola a=pav-kv

This PR moves the `RaftNode` interface initialization to the point when `raft.RawNode` is created. This will be needed for correct initialization of `LogTracker` from the initial raft state.

The `RaftNode` interface is moved to `replica_rac2` integration package, to reduce cluttering of `kvserver`.

Part of #129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 9, 2024
2 parents 32a1f71 + 44155cc commit 60313dc
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 47 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"debug_print.go",
"doc.go",
"flow_control_integration.go",
"flow_control_raft.go",
"flow_control_raft_transport.go",
"flow_control_replica.go",
"flow_control_replica_integration.go",
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,6 @@ func (r *replicaForRACv2) MuUnlock() {
r.mu.Unlock()
}

// RaftNodeMuLocked implements replica_rac2.Replica.
func (r *replicaForRACv2) RaftNodeMuLocked() replica_rac2.RaftNode {
return raftNodeForRACv2{RawNode: r.mu.internalRaftGroup}
}

// LeaseholderMuLocked implements replica_rac2.Replica.
func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID {
return r.mu.state.Lease.Replica.ReplicaID
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"close_scheduler.go",
"doc.go",
"processor.go",
"raft_node.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2",
visibility = ["//visibility:public"],
Expand All @@ -15,7 +16,9 @@ go_library(
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
Expand Down
52 changes: 29 additions & 23 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -42,12 +41,6 @@ type Replica interface {
MuLock()
// MuUnlock releases Replica.mu.
MuUnlock()
// RaftNodeMuLocked returns a reference to the RaftNode. It is only called
// after Processor knows the Replica is initialized.
//
// At least Replica mu is held. The caller does not make any claims about
// whether it holds raftMu or not.
RaftNodeMuLocked() RaftNode
// LeaseholderMuLocked returns the Replica's current knowledge of the
// leaseholder, which can be stale. It is only called after Processor
// knows the Replica is initialized.
Expand Down Expand Up @@ -304,6 +297,12 @@ type SideChannelInfoUsingRaftMessageRequest struct {
// NotEnabledWhenLeader, acquire Replica.mu and close
// replicaFlowControlIntegrationImpl (RACv1).
type Processor interface {
// InitRaftLocked is called when RaftNode is initialized for the Replica.
// NB: can be called twice before the Replica is fully initialized.
//
// Both Replica mu and raftMu are held.
InitRaftLocked(context.Context, RaftNode)

// OnDestroyRaftMuLocked is called when the Replica is being destroyed.
//
// We need to know when Replica.mu.destroyStatus is updated, so that we
Expand Down Expand Up @@ -503,6 +502,17 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
return p.mu.leader.rc != nil || p.mu.follower.isLeaderUsingV2Protocol
}

// InitRaftLocked implements Processor.
func (p *processorImpl) InitRaftLocked(ctx context.Context, rn RaftNode) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if p.raftMu.replicas != nil {
log.Fatalf(ctx, "initializing RaftNode after replica is initialized")
}
p.raftMu.raftNode = rn
// TODO(pav-kv): initialize the LogTracker from RaftNode state.
}

// OnDestroyRaftMuLocked implements Processor.
func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
Expand Down Expand Up @@ -569,18 +579,17 @@ func (p *processorImpl) OnDescChangedLocked(
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if p.raftMu.replicas == nil {
// Replica is initialized, in that we have a descriptor. Get the
// RaftNode.
p.raftMu.raftNode = p.opts.Replica.RaftNodeMuLocked()
p.raftMu.tenantID = tenantID
} else {
if p.raftMu.tenantID != tenantID {
panic(errors.AssertionFailedf("tenantId was changed from %s to %s",
p.raftMu.tenantID, tenantID))
initialization := p.raftMu.replicas == nil
if initialization {
// Replica is initialized, in that we now have a descriptor.
if p.raftMu.raftNode == nil {
panic(errors.AssertionFailedf("RaftNode is not initialized"))
}
p.raftMu.tenantID = tenantID
} else if p.raftMu.tenantID != tenantID {
panic(errors.AssertionFailedf("tenantId was changed from %s to %s",
p.raftMu.tenantID, tenantID))
}
initialization := p.raftMu.replicas == nil
p.raftMu.replicas = descToReplicaSet(desc)
p.raftMu.replicasChanged = true
// We need to promptly return tokens if some replicas have been removed,
Expand Down Expand Up @@ -718,15 +727,12 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
p.opts.Replica.RaftMuAssertHeld()
p.mu.Lock()
defer p.mu.Unlock()
if p.mu.destroyed {
// Skip if the replica is not initialized or already destroyed.
if p.raftMu.replicas == nil || p.mu.destroyed {
return
}
if p.raftMu.raftNode == nil {
if buildutil.CrdbTestBuild {
if len(e.Entries) > 0 {
panic(errors.AssertionFailedf("entries provided without raft node"))
}
}
log.Fatal(ctx, "RaftNode is not initialized")
return
}
// NB: we need to call makeStateConsistentRaftMuLockedProcLocked even if
Expand Down
22 changes: 9 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@ type testReplica struct {
var _ Replica = &testReplica{}

func newTestReplica(b *strings.Builder) *testReplica {
r := &testReplica{
b: b,
}
r.raftNode = &testRaftNode{
b: b,
r: r,
}
return r
return &testReplica{b: b}
}

func (r *testReplica) RaftMuAssertHeld() {
Expand All @@ -77,11 +70,6 @@ func (r *testReplica) MuUnlock() {
r.mu.Unlock()
}

func (r *testReplica) RaftNodeMuLocked() RaftNode {
fmt.Fprintf(r.b, " Replica.RaftNodeMuLocked\n")
return r.raftNode
}

func (r *testReplica) LeaseholderMuLocked() roachpb.ReplicaID {
fmt.Fprintf(r.b, " Replica.LeaseholderMuLocked\n")
r.mu.AssertHeld()
Expand Down Expand Up @@ -332,6 +320,14 @@ func TestProcessorBasic(t *testing.T) {
reset(enabledLevel)
return builderStr()

case "init-raft":
var mark rac2.LogMark
d.ScanArgs(t, "log-term", &mark.Term)
d.ScanArgs(t, "log-index", &mark.Index)
r.raftNode = &testRaftNode{b: &b, r: r, term: mark.Term, stableIndex: mark.Index}
p.InitRaftLocked(ctx, r.raftNode)
return builderStr()

case "set-raft-state":
if d.HasArg("admitted") {
var arg string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver
package replica_rac2

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
Expand All @@ -23,7 +22,10 @@ type raftNodeForRACv2 struct {
*raft.RawNode
}

var _ replica_rac2.RaftNode = raftNodeForRACv2{}
// NewRaftNode creates a RaftNode implementation from the given RawNode.
func NewRaftNode(rn *raft.RawNode) RaftNode {
return raftNodeForRACv2{RawNode: rn}
}

func (rn raftNodeForRACv2) EnablePingForAdmittedLaggingLocked() {
panic("TODO(pav-kv): implement")
Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ admit-for-eval pri=normal-pri
----
admitted: false err: <nil>

# No-op since RaftNode is not initialized.
handle-raft-ready-and-admit
----
HandleRaftReady:
Replica.RaftMuAssertHeld
.....

reset
----
n1,s2,r3: replica=5, tenant=4, enabled-level=not-enabled
Expand All @@ -41,6 +48,12 @@ get-enabled-level
----
enabled-level: v1-encoding

# TODO(pav-kv): initialize to a correct state.
init-raft log-term=0 log-index=20
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld

# Since stable-index is 20, admitted is slightly behind. The leader and
# leaseholder are both on replica-id 10.
set-raft-state leader=10 stable-index=20 next-unstable-index=25 leaseholder=10 admitted=[15,20,15,20]
Expand All @@ -60,7 +73,6 @@ on-desc-changed replicas=n11/s11/11
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld
Replica.RaftNodeMuLocked

# handleRaftReady. It thinks the leader is using v1, so there is no
# advancement of admitted, or submission of this entry for admission.
Expand Down Expand Up @@ -604,6 +616,11 @@ get-enabled-level
----
enabled-level: not-enabled

init-raft log-term=0 log-index=20
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld

set-raft-state leader=5 my-leader-term=50 leaseholder=5 stable-index=20 next-unstable-index=25 admitted=[15,20,15,20]
----
Raft: leader: 5 leaseholder: 5 stable: 20 next-unstable: 25 term: 50 admitted: [15, 20, 15, 20]
Expand All @@ -620,7 +637,6 @@ on-desc-changed replicas=n11/s11/11,n13/s13/13
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld
Replica.RaftNodeMuLocked

handle-raft-ready-and-admit
----
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
return err
}
r.mu.internalRaftGroup = rg
r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg))
return nil
}

Expand Down

0 comments on commit 60313dc

Please sign in to comment.