Skip to content

Commit

Permalink
kvserver,rac2: use RaftMaxInflightBytes when force-flushing
Browse files Browse the repository at this point in the history
This limit is roughly respected, in that force-flush is paused when the
limit is exceeded, and unpaused when ready handling indicates that the
replicaSendStream is back within the limit.

Informs cockroachdb#135814

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Dec 7, 2024
1 parent e22e07a commit c1cf6fb
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 17 deletions.
90 changes: 76 additions & 14 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ type ReplicaStateInfo struct {
// (Match, Next) is in-flight.
Match uint64
Next uint64
// InflightBytes is the bytes in (Match, Next).
InflightBytes uint64
}

// sendQueueStatRefreshInterval is the interval at which the send queue stats
Expand Down Expand Up @@ -563,8 +565,16 @@ type RangeControllerOptions struct {
EvalWaitMetrics *EvalWaitMetrics
RangeControllerMetrics *RangeControllerMetrics
WaitForEvalConfig *WaitForEvalConfig
ReplicaMutexAsserter ReplicaMutexAsserter
Knobs *kvflowcontrol.TestingKnobs
// RaftMaxInflightBytes is a soft limit on the maximum inflight bytes when
// using MsgAppPull mode. Currently, the RangeController only attempts to
// respect this when force-flushing a replicaSendStream, since the typical
// production configuration of this value (32MiB) is larger than the typical
// production configuration of the shared regular token pool (16MiB), so
// attempting to respect this when doing non-force-flush sends is
// unnecessary.
RaftMaxInflightBytes uint64
ReplicaMutexAsserter ReplicaMutexAsserter
Knobs *kvflowcontrol.TestingKnobs
}

// RangeControllerInitState is the initial state at the time of creation.
Expand Down Expand Up @@ -683,6 +693,9 @@ func NewRangeController(
if log.V(1) {
log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID)
}
if o.RaftMaxInflightBytes == 0 {
o.RaftMaxInflightBytes = math.MaxUint64
}
rc := &rangeController{
opts: o,
term: init.Term,
Expand Down Expand Up @@ -873,10 +886,15 @@ type raftEventForReplica struct {
// Reminder: (ReplicaStateInfo.Match, ReplicaStateInfo.Next) are in-flight.
// nextRaftIndex is where the next entry will be added.
//
// ReplicaStateInfo.{State, Match} are the latest state.
// ReplicaStateInfo.{State, Match, InflightBytes} are the latest state.
// ReplicaStateInfo.Next represents the state preceding this raft event,
// i.e., it will be altered by sendingEntries. nextRaftIndex also represents
// the state preceding this event, and will be altered by newEntries.
// i.e., it will be altered by sendingEntries. Note that InflightBytes
// already incorporates sendingEntries -- we could choose to iterate over
// the sending entries in constructRaftEventForReplica and compensate for
// them, but we don't bother.
//
// nextRaftIndex also represents the state preceding this event, and will be
// altered by newEntries.
//
// createSendStream is set to true if the replicaSendStream should be
// (re)created.
Expand Down Expand Up @@ -1047,9 +1065,10 @@ func constructRaftEventForReplica(
refr := raftEventForReplica{
mode: mode,
replicaStateInfo: ReplicaStateInfo{
State: latestReplicaStateInfo.State,
Match: latestReplicaStateInfo.Match,
Next: next,
State: latestReplicaStateInfo.State,
Match: latestReplicaStateInfo.Match,
Next: next,
InflightBytes: latestReplicaStateInfo.InflightBytes,
},
nextRaftIndex: raftEventAppendState.rewoundNextRaftIndex,
newEntries: raftEventAppendState.newEntries,
Expand Down Expand Up @@ -2110,6 +2129,10 @@ type replicaSendStream struct {
tokenWatcherHandle SendTokenWatcherHandle
deductedForSchedulerTokens kvflowcontrol.Tokens
}
// inflightBytes is the sum of bytes that are inflight, i.e., in
// (ReplicaStateInfo.Match,ReplicaStateInfo.Next).
inflightBytes uint64

// TODO(sumeer): remove closed. Whenever a replicaSendStream is closed it
// is also no longer referenced by replicaState. The only motivation for
// closed is that replicaSendStream.Notify calls directly into
Expand Down Expand Up @@ -2514,6 +2537,10 @@ func (rs *replicaState) scheduledRaftMuLocked(
rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, mode, false, false)
return false, false
}
if rss.mu.sendQueue.forceFlushStopIndex.active() &&
rss.exceedInflightBytesThresholdRaftMuAndStreamLocked() {
return false, false
}
// 4MB. Don't want to hog the scheduler thread for too long.
const MaxBytesToSend kvflowcontrol.Tokens = 4 << 20
bytesToSend := MaxBytesToSend
Expand Down Expand Up @@ -2561,6 +2588,7 @@ func (rs *replicaState) scheduledRaftMuLocked(
rs.sendStream = nil
return false, true
}
rss.updateInflightRaftMuAndStreamLocked(slice)
rss.dequeueFromQueueAndSendRaftMuAndStreamLocked(ctx, msg)
isEmpty := rss.isEmptySendQueueStreamLocked()
if isEmpty {
Expand All @@ -2577,12 +2605,18 @@ func (rs *replicaState) scheduledRaftMuLocked(
// next tick. We accept a latency hiccup in this case for now.
rss.mu.sendQueue.forceFlushStopIndex = 0
}
forceFlushNeedsToPause := false
if rss.mu.sendQueue.forceFlushStopIndex.active() &&
rss.exceedInflightBytesThresholdRaftMuAndStreamLocked() {
forceFlushNeedsToPause = true
}
watchForTokens :=
!rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0
if watchForTokens {
rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx)
}
return !watchForTokens, false
scheduleAgain = !watchForTokens && !forceFlushNeedsToPause
return scheduleAgain, false
}

func (rs *replicaState) closeSendStreamRaftMuLocked(ctx context.Context) {
Expand Down Expand Up @@ -2641,6 +2675,9 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
wasEmptySendQ := rss.isEmptySendQueueStreamLocked()
rss.tryHandleModeChangeRaftMuAndStreamLocked(
ctx, event.mode, wasEmptySendQ, directive.forceFlushStopIndex.active())
// Use the latest inflight bytes, since it reflects the advancing Match.
wasExceedingInflightBytesThreshold := rss.exceedInflightBytesThresholdRaftMuAndStreamLocked()
rss.mu.inflightBytes = event.replicaStateInfo.InflightBytes
if event.mode == MsgAppPull {
// MsgAppPull mode (i.e., followers). Populate sendingEntries.
n := len(event.sendingEntries)
Expand All @@ -2649,12 +2686,18 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID))
}
if directive.forceFlushStopIndex.active() {
// Must have a send-queue, so sendingEntries should stay empty (these
// will be queued).
if !rss.mu.sendQueue.forceFlushStopIndex.active() {
// Must have a send-queue, so sendingEntries should stay empty
// (these will be queued).
rss.startForceFlushRaftMuAndStreamLocked(ctx, directive.forceFlushStopIndex)
} else if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex {
rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex
} else {
if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex {
rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex
}
if wasExceedingInflightBytesThreshold &&
!rss.exceedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
}
}
} else {
// INVARIANT: !directive.forceFlushStopIndex.active()
Expand Down Expand Up @@ -2797,6 +2840,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
return false,
errors.Errorf("SendMsgApp could not send for replica %d", rss.parent.desc.ReplicaID)
}
rss.updateInflightRaftMuAndStreamLocked(slice)
rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false)
}

Expand All @@ -2816,6 +2860,16 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked(
return transitionedSendQState, nil
}

func (rss *replicaSendStream) updateInflightRaftMuAndStreamLocked(ls raft.LogSlice) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
entries := ls.Entries()
for i := range ls.Entries() {
// NB: raft.payloadSize also uses len(raftpb.Entry.Data).
rss.mu.inflightBytes += uint64(len(entries[i].Data))
}
}

func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked(
ctx context.Context, mode RaftMsgAppMode, isEmptySendQ bool, toldToForceFlush bool,
) {
Expand Down Expand Up @@ -2857,14 +2911,22 @@ func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked(
}
}

func (rss *replicaSendStream) exceedInflightBytesThresholdRaftMuAndStreamLocked() bool {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
return rss.mu.inflightBytes > rss.parent.parent.opts.RaftMaxInflightBytes
}

func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(
ctx context.Context, forceFlushStopIndex forceFlushStopIndex,
) {
rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
rss.mu.AssertHeld()
rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1)
rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
if !rss.exceedInflightBytesThresholdRaftMuAndStreamLocked() {
rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID)
}
rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ type RangeControllerFactoryImpl struct {
scheduler rac2.Scheduler
sendTokenWatcher *rac2.SendTokenWatcher
waitForEvalConfig *rac2.WaitForEvalConfig
raftMaxInflightBytes uint64
knobs *kvflowcontrol.TestingKnobs
}

Expand All @@ -1223,6 +1224,7 @@ func NewRangeControllerFactoryImpl(
scheduler rac2.Scheduler,
sendTokenWatcher *rac2.SendTokenWatcher,
waitForEvalConfig *rac2.WaitForEvalConfig,
raftMaxInflightBytes uint64,
knobs *kvflowcontrol.TestingKnobs,
) RangeControllerFactoryImpl {
return RangeControllerFactoryImpl{
Expand All @@ -1234,6 +1236,7 @@ func NewRangeControllerFactoryImpl(
scheduler: scheduler,
sendTokenWatcher: sendTokenWatcher,
waitForEvalConfig: waitForEvalConfig,
raftMaxInflightBytes: raftMaxInflightBytes,
knobs: knobs,
}
}
Expand All @@ -1258,6 +1261,7 @@ func (f RangeControllerFactoryImpl) New(
EvalWaitMetrics: f.evalWaitMetrics,
RangeControllerMetrics: f.rangeControllerMetrics,
WaitForEvalConfig: f.waitForEvalConfig,
RaftMaxInflightBytes: f.raftMaxInflightBytes,
ReplicaMutexAsserter: state.muAsserter,
Knobs: f.knobs,
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func MakeReplicaStateInfos(rn *raft.RawNode, infoMap map[roachpb.ReplicaID]rac2.
clear(infoMap)
rn.WithBasicProgress(func(peerID raftpb.PeerID, progress tracker.BasicProgress) {
infoMap[roachpb.ReplicaID(peerID)] = rac2.ReplicaStateInfo{
Match: progress.Match,
Next: progress.Next,
State: progress.State,
State: progress.State,
Match: progress.Match,
Next: progress.Next,
InflightBytes: progress.InflightBytes,
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,7 @@ func NewStore(
(*racV2Scheduler)(s.scheduler),
s.cfg.KVFlowSendTokenWatcher,
s.cfg.KVFlowWaitForEvalConfig,
s.cfg.RaftMaxInflightBytes,
s.TestingKnobs().FlowControlTestingKnobs,
)

Expand Down

0 comments on commit c1cf6fb

Please sign in to comment.