Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: wire up replica_rac range controller factory #130197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/flow_control_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
package kvserver

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

Expand Down Expand Up @@ -56,3 +58,19 @@ func (rn raftNodeForRACv2) SetAdmittedLocked([raftpb.NumPriorities]uint64) raftp
func (rn raftNodeForRACv2) StepMsgAppRespForAdmittedLocked(m raftpb.Message) error {
return rn.RawNode.Step(m)
}

func (rn raftNodeForRACv2) FollowerStateRaftMuLocked(
replicaID roachpb.ReplicaID,
) rac2.FollowerStateInfo {
// TODO(pav-kv): this is a temporary implementation.
status := rn.Status()
if progress, ok := status.Progress[raftpb.PeerID(replicaID)]; ok {
return rac2.FollowerStateInfo{
State: progress.State,
Match: progress.Match,
Next: progress.Next,
}
}

return rac2.FollowerStateInfo{State: tracker.StateProbe}
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
Expand Down
54 changes: 42 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -102,6 +103,9 @@ type RaftNode interface {

// Read-only methods.

// rac2.RaftInterface is an interface that abstracts the raft.RawNode for use
// in the RangeController.
rac2.RaftInterface
// TermLocked returns the current term of this replica.
TermLocked() uint64
// LeaderLocked returns the current known leader. This state can advance
Expand Down Expand Up @@ -181,12 +185,18 @@ type ACWorkQueue interface {
Admit(ctx context.Context, entry EntryForAdmission) bool
}

// TODO(sumeer): temporary placeholder, until RangeController is more fully
// fleshed out.
type rangeControllerInitState struct {
replicaSet rac2.ReplicaSet
leaseholder roachpb.ReplicaID
nextRaftIndex uint64
// These fields are required options for the RangeController specific to the
// replica and range, rather than the store or node, so we pass them as part
// of the range controller init state.
rangeID roachpb.RangeID
tenantID roachpb.TenantID
localReplicaID roachpb.ReplicaID
raftInterface rac2.RaftInterface
admittedTracker rac2.AdmittedTracker
}

// RangeControllerFactory abstracts RangeController creation for testing.
Expand Down Expand Up @@ -689,9 +699,14 @@ func (p *processorImpl) createLeaderStateRaftMuLockedProcLocked(
p.mu.leader.rcReferenceUpdateMu.Lock()
defer p.mu.leader.rcReferenceUpdateMu.Unlock()
p.mu.leader.rc = p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{
replicaSet: p.raftMu.replicas,
leaseholder: p.mu.leaseholderID,
nextRaftIndex: nextUnstableIndex,
replicaSet: p.raftMu.replicas,
leaseholder: p.mu.leaseholderID,
nextRaftIndex: nextUnstableIndex,
rangeID: p.opts.RangeID,
tenantID: p.raftMu.tenantID,
localReplicaID: p.opts.ReplicaID,
raftInterface: p.raftMu.raftNode,
admittedTracker: p,
})
}()
p.mu.leader.term = term
Expand Down Expand Up @@ -985,34 +1000,49 @@ func (p *processorImpl) GetAdmitted(replicaID roachpb.ReplicaID) rac2.AdmittedVe
return rac2.AdmittedVector{}
}

// RangeControllerFactoryImpl implements RangeControllerFactory.
//
// TODO(sumeer): replace with real implementation once RangeController impl is
// ready.
// RangeControllerFactoryImpl implements the RangeControllerFactory interface.
var _ RangeControllerFactory = RangeControllerFactoryImpl{}

// RangeControllerFactoryImpl is a factory to create RangeControllers. There
// should be one per-store. When a new RangeController is created, the caller
// provides the range specific information as part of rangeControllerInitState.
type RangeControllerFactoryImpl struct {
clock *hlc.Clock
evalWaitMetrics *rac2.EvalWaitMetrics
streamTokenCounterProvider *rac2.StreamTokenCounterProvider
closeTimerScheduler rac2.ProbeToCloseTimerScheduler
}

func NewRangeControllerFactoryImpl(
clock *hlc.Clock,
evalWaitMetrics *rac2.EvalWaitMetrics,
streamTokenCounterProvider *rac2.StreamTokenCounterProvider,
closeTimerScheduler rac2.ProbeToCloseTimerScheduler,
) RangeControllerFactoryImpl {
return RangeControllerFactoryImpl{
clock: clock,
evalWaitMetrics: evalWaitMetrics,
streamTokenCounterProvider: streamTokenCounterProvider,
closeTimerScheduler: closeTimerScheduler,
}
}

// New creates a new RangeController.
func (f RangeControllerFactoryImpl) New(
ctx context.Context, state rangeControllerInitState,
) rac2.RangeController {
return rac2.NewRangeController(
ctx,
// TODO(kvoli): Thread through other required init state and options.
rac2.RangeControllerOptions{
SSTokenCounter: f.streamTokenCounterProvider,
EvalWaitMetrics: f.evalWaitMetrics,
RangeID: state.rangeID,
TenantID: state.tenantID,
LocalReplicaID: state.localReplicaID,
SSTokenCounter: f.streamTokenCounterProvider,
RaftInterface: state.raftInterface,
Clock: f.clock,
CloseTimerScheduler: f.closeTimerScheduler,
AdmittedTracker: state.admittedTracker,
EvalWaitMetrics: f.evalWaitMetrics,
},
rac2.RangeControllerInitState{
ReplicaSet: state.replicaSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ func (rn *testRaftNode) StepMsgAppRespForAdmittedLocked(msg raftpb.Message) erro
return nil
}

func (rn *testRaftNode) FollowerStateRaftMuLocked(
replicaID roachpb.ReplicaID,
) rac2.FollowerStateInfo {
rn.r.mu.AssertHeld()
fmt.Fprintf(rn.b, " RaftNode.FollowerStateRaftMuLocked(%v)\n", replicaID)
// TODO(kvoli,sumeerbhola): implement.
return rac2.FollowerStateInfo{}
}

func admittedString(admitted [raftpb.NumPriorities]uint64) string {
return fmt.Sprintf("[%d, %d, %d, %d]", admitted[0], admitted[1], admitted[2], admitted[3])
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,16 @@ func newUninitializedReplicaWithoutRaftGroup(
)
r.raftMu.flowControlLevel = racV2EnabledWhenLeaderLevel(r.raftCtx, store.cfg.Settings)
r.flowControlV2 = replica_rac2.NewProcessor(replica_rac2.ProcessorOptions{
NodeID: store.NodeID(),
StoreID: r.StoreID(),
RangeID: r.RangeID,
ReplicaID: r.replicaID,
Replica: (*replicaForRACv2)(r),
RaftScheduler: r.store.scheduler,
AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker,
ACWorkQueue: r.store.cfg.KVAdmissionController,
EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics,
RangeControllerFactory: replica_rac2.NewRangeControllerFactoryImpl(
r.store.cfg.KVFlowEvalWaitMetrics,
r.store.cfg.KVFlowStreamTokenProvider),
NodeID: store.NodeID(),
StoreID: r.StoreID(),
RangeID: r.RangeID,
ReplicaID: r.replicaID,
Replica: (*replicaForRACv2)(r),
RaftScheduler: r.store.scheduler,
AdmittedPiggybacker: r.store.cfg.KVFlowAdmittedPiggybacker,
ACWorkQueue: r.store.cfg.KVAdmissionController,
EvalWaitMetrics: r.store.cfg.KVFlowEvalWaitMetrics,
RangeControllerFactory: r.store.kvflowRangeControllerFactory,
EnabledWhenLeaderLevel: r.raftMu.flowControlLevel,
})
return r
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,12 @@ type Store struct {
// transport is connected to, and is used by the canonical
// replicaFlowControlIntegration implementation.
raftTransportForFlowControl raftTransportForFlowControl

// kvflowRangeControllerFactory is used for replication AC (flow control) V2
// to create new range controllers which mediate the flow of requests to
// replicas.
kvflowRangeControllerFactory replica_rac2.RangeControllerFactory

// metricsMu protects the collection and update of engine metrics.
metricsMu syncutil.Mutex

Expand Down Expand Up @@ -1545,6 +1551,17 @@ func NewStore(
cfg.RaftSchedulerConcurrency, cfg.RaftSchedulerShardSize, cfg.RaftSchedulerConcurrencyPriority,
cfg.RaftElectionTimeoutTicks)

// kvflowRangeControllerFactory depends on the raft scheduler, so it must be
// created per-store rather than per-node like other replication admission
// control (flow control) v2 components.
s.kvflowRangeControllerFactory = replica_rac2.NewRangeControllerFactoryImpl(
s.Clock(),
s.cfg.KVFlowEvalWaitMetrics,
s.cfg.KVFlowStreamTokenProvider,
replica_rac2.NewStreamCloseScheduler(
s.stopper, timeutil.DefaultTimeSource{}, s.scheduler),
)

// Run a log SyncWaiter loop for every 32 raft scheduler goroutines.
// Experiments on c5d.12xlarge instances (48 vCPUs, the largest single-socket
// instance AWS offers) show that with fewer SyncWaiters, raft log callback
Expand Down
Loading