Skip to content

Commit

Permalink
kvserver,rac2: initialize RaftNode early
Browse files Browse the repository at this point in the history
This commit moves the RaftNode interface initialization to the point
when raft.RawNode is created. This will be needed for correct
initialization of LogTracker from the initial raft state.

Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 6, 2024
1 parent 930d463 commit 44155cc
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 43 deletions.
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,6 @@ func (r *replicaForRACv2) MuUnlock() {
r.mu.Unlock()
}

// RaftNodeMuLocked implements replica_rac2.Replica.
func (r *replicaForRACv2) RaftNodeMuLocked() replica_rac2.RaftNode {
return replica_rac2.NewRaftNode(r.mu.internalRaftGroup)
}

// LeaseholderMuLocked implements replica_rac2.Replica.
func (r *replicaForRACv2) LeaseholderMuLocked() roachpb.ReplicaID {
return r.mu.state.Lease.Replica.ReplicaID
Expand Down
52 changes: 29 additions & 23 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -42,12 +41,6 @@ type Replica interface {
MuLock()
// MuUnlock releases Replica.mu.
MuUnlock()
// RaftNodeMuLocked returns a reference to the RaftNode. It is only called
// after Processor knows the Replica is initialized.
//
// At least Replica mu is held. The caller does not make any claims about
// whether it holds raftMu or not.
RaftNodeMuLocked() RaftNode
// LeaseholderMuLocked returns the Replica's current knowledge of the
// leaseholder, which can be stale. It is only called after Processor
// knows the Replica is initialized.
Expand Down Expand Up @@ -304,6 +297,12 @@ type SideChannelInfoUsingRaftMessageRequest struct {
// NotEnabledWhenLeader, acquire Replica.mu and close
// replicaFlowControlIntegrationImpl (RACv1).
type Processor interface {
// InitRaftLocked is called when RaftNode is initialized for the Replica.
// NB: can be called twice before the Replica is fully initialized.
//
// Both Replica mu and raftMu are held.
InitRaftLocked(context.Context, RaftNode)

// OnDestroyRaftMuLocked is called when the Replica is being destroyed.
//
// We need to know when Replica.mu.destroyStatus is updated, so that we
Expand Down Expand Up @@ -503,6 +502,17 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
return p.mu.leader.rc != nil || p.mu.follower.isLeaderUsingV2Protocol
}

// InitRaftLocked implements Processor.
func (p *processorImpl) InitRaftLocked(ctx context.Context, rn RaftNode) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if p.raftMu.replicas != nil {
log.Fatalf(ctx, "initializing RaftNode after replica is initialized")
}
p.raftMu.raftNode = rn
// TODO(pav-kv): initialize the LogTracker from RaftNode state.
}

// OnDestroyRaftMuLocked implements Processor.
func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
Expand Down Expand Up @@ -569,18 +579,17 @@ func (p *processorImpl) OnDescChangedLocked(
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if p.raftMu.replicas == nil {
// Replica is initialized, in that we have a descriptor. Get the
// RaftNode.
p.raftMu.raftNode = p.opts.Replica.RaftNodeMuLocked()
p.raftMu.tenantID = tenantID
} else {
if p.raftMu.tenantID != tenantID {
panic(errors.AssertionFailedf("tenantId was changed from %s to %s",
p.raftMu.tenantID, tenantID))
initialization := p.raftMu.replicas == nil
if initialization {
// Replica is initialized, in that we now have a descriptor.
if p.raftMu.raftNode == nil {
panic(errors.AssertionFailedf("RaftNode is not initialized"))
}
p.raftMu.tenantID = tenantID
} else if p.raftMu.tenantID != tenantID {
panic(errors.AssertionFailedf("tenantId was changed from %s to %s",
p.raftMu.tenantID, tenantID))
}
initialization := p.raftMu.replicas == nil
p.raftMu.replicas = descToReplicaSet(desc)
p.raftMu.replicasChanged = true
// We need to promptly return tokens if some replicas have been removed,
Expand Down Expand Up @@ -718,15 +727,12 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
p.opts.Replica.RaftMuAssertHeld()
p.mu.Lock()
defer p.mu.Unlock()
if p.mu.destroyed {
// Skip if the replica is not initialized or already destroyed.
if p.raftMu.replicas == nil || p.mu.destroyed {
return
}
if p.raftMu.raftNode == nil {
if buildutil.CrdbTestBuild {
if len(e.Entries) > 0 {
panic(errors.AssertionFailedf("entries provided without raft node"))
}
}
log.Fatal(ctx, "RaftNode is not initialized")
return
}
// NB: we need to call makeStateConsistentRaftMuLockedProcLocked even if
Expand Down
22 changes: 9 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,7 @@ type testReplica struct {
var _ Replica = &testReplica{}

func newTestReplica(b *strings.Builder) *testReplica {
r := &testReplica{
b: b,
}
r.raftNode = &testRaftNode{
b: b,
r: r,
}
return r
return &testReplica{b: b}
}

func (r *testReplica) RaftMuAssertHeld() {
Expand All @@ -77,11 +70,6 @@ func (r *testReplica) MuUnlock() {
r.mu.Unlock()
}

func (r *testReplica) RaftNodeMuLocked() RaftNode {
fmt.Fprintf(r.b, " Replica.RaftNodeMuLocked\n")
return r.raftNode
}

func (r *testReplica) LeaseholderMuLocked() roachpb.ReplicaID {
fmt.Fprintf(r.b, " Replica.LeaseholderMuLocked\n")
r.mu.AssertHeld()
Expand Down Expand Up @@ -332,6 +320,14 @@ func TestProcessorBasic(t *testing.T) {
reset(enabledLevel)
return builderStr()

case "init-raft":
var mark rac2.LogMark
d.ScanArgs(t, "log-term", &mark.Term)
d.ScanArgs(t, "log-index", &mark.Index)
r.raftNode = &testRaftNode{b: &b, r: r, term: mark.Term, stableIndex: mark.Index}
p.InitRaftLocked(ctx, r.raftNode)
return builderStr()

case "set-raft-state":
if d.HasArg("admitted") {
var arg string
Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ admit-for-eval pri=normal-pri
----
admitted: false err: <nil>

# No-op since RaftNode is not initialized.
handle-raft-ready-and-admit
----
HandleRaftReady:
Replica.RaftMuAssertHeld
.....

reset
----
n1,s2,r3: replica=5, tenant=4, enabled-level=not-enabled
Expand All @@ -41,6 +48,12 @@ get-enabled-level
----
enabled-level: v1-encoding

# TODO(pav-kv): initialize to a correct state.
init-raft log-term=0 log-index=20
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld

# Since stable-index is 20, admitted is slightly behind. The leader and
# leaseholder are both on replica-id 10.
set-raft-state leader=10 stable-index=20 next-unstable-index=25 leaseholder=10 admitted=[15,20,15,20]
Expand All @@ -60,7 +73,6 @@ on-desc-changed replicas=n11/s11/11
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld
Replica.RaftNodeMuLocked

# handleRaftReady. It thinks the leader is using v1, so there is no
# advancement of admitted, or submission of this entry for admission.
Expand Down Expand Up @@ -604,6 +616,11 @@ get-enabled-level
----
enabled-level: not-enabled

init-raft log-term=0 log-index=20
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld

set-raft-state leader=5 my-leader-term=50 leaseholder=5 stable-index=20 next-unstable-index=25 admitted=[15,20,15,20]
----
Raft: leader: 5 leaseholder: 5 stable: 20 next-unstable: 25 term: 50 admitted: [15, 20, 15, 20]
Expand All @@ -620,7 +637,6 @@ on-desc-changed replicas=n11/s11/11,n13/s13/13
----
Replica.RaftMuAssertHeld
Replica.MuAssertHeld
Replica.RaftNodeMuLocked

handle-raft-ready-and-admit
----
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
return err
}
r.mu.internalRaftGroup = rg
r.flowControlV2.InitRaftLocked(ctx, replica_rac2.NewRaftNode(rg))
return nil
}

Expand Down

0 comments on commit 44155cc

Please sign in to comment.