diff --git a/pkg/kv/kvserver/flow_control_replica_integration_test.go b/pkg/kv/kvserver/flow_control_replica_integration_test.go index ad2bca7a3491..1d2029cb70ad 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration_test.go +++ b/pkg/kv/kvserver/flow_control_replica_integration_test.go @@ -413,9 +413,9 @@ func newMockFlowHandle( func (m *mockFlowHandle) Admit( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, -) error { +) (bool, error) { m.t.Fatal("unimplemented") - return nil + return false, nil } func (m *mockFlowHandle) DeductTokensFor( diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 922354f2b62e..b33eb3899e5a 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -310,29 +310,41 @@ func (n *controllerImpl) AdmitKVWork( // to continue even when throttling since there are often significant // number of tokens available. if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - if !bypassAdmission && - kvflowcontrol.Enabled.Get(&n.settings.SV) && - n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) { + var admitted bool + attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV) && + n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) + if attemptFlowControl && !bypassAdmission { kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID) if !found { return Handle{}, nil } - if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil { + var err error + admitted, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)) + if err != nil { return Handle{}, err + } else if admitted { + // NB: It's possible for us to be waiting for available flow tokens + // for a different set of streams that the ones we'll eventually + // deduct tokens from, if the range experiences a split between now + // and the point of deduction. That's ok, there's no strong + // synchronization needed between these two points. + ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionInfo.Priority), + AdmissionCreateTime: admissionInfo.CreateTime, + AdmissionOriginNode: n.nodeID.Get(), + } } - // NB: It's possible for us to be waiting for available flow tokens - // for a different set of streams that the ones we'll eventually - // deduct tokens from, if the range experiences a split between now - // and the point of deduction. That's ok, there's no strong - // synchronization needed between these two points. - ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ - AdmissionPriority: int32(admissionInfo.Priority), - AdmissionCreateTime: admissionInfo.CreateTime, - AdmissionOriginNode: n.nodeID.Get(), - } - } else { + + } + // If flow control is disabled or if work bypasses flow control, we still + // subject it above-raft, leaseholder-only IO admission control. + if !attemptFlowControl || !admitted { storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) if storeAdmissionQ != nil { + // NB: Even though we would know here we're bypassing admission (via + // `bypassAdmission`), we still have to explicitly invoke `.Admit()`. + // We do it for correct token accounting (i.e. we deduct tokens without + // blocking). storeWorkHandle, err := storeAdmissionQ.Admit( ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) if err != nil { diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 7e8c5d9fdb32..0046a858f26a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -60,12 +60,6 @@ const ( // virtually enqueued in below-raft admission queues and dequeued in // priority order, but only empty elastic flow token buckets above-raft will // block further elastic traffic from being admitted. - // - // TODO(irfansharif): We're potentially risking OOMs doing all this tracking - // for regular work, without coalescing state. With a bit of plumbing, for - // requests that bypass flow control we could fallback to using the non-AC - // raft encodings and avoid the potential OOMs. Address this as part of - // #95563. ApplyToElastic ModeT = iota // ApplyToAll uses flow control for both elastic and regular traffic, // i.e. all work will wait for flow tokens to be available. @@ -117,11 +111,12 @@ type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. type Controller interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority, create-time, and over the given stream. This - // blocks until there are flow tokens available or the stream disconnects, - // subject to context cancellation. - Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority, create-time, and over the given stream. This blocks + // until there are flow tokens available or the stream disconnects, subject to + // context cancellation. This returns true if the request was admitted through + // flow control. Ignore the first return type if err != nil + Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (admitted bool, _ error) // DeductTokens deducts (without blocking) flow tokens for replicating work // with given priority over the given stream. Requests are expected to // have been Admit()-ed first. @@ -158,10 +153,11 @@ type Controller interface { // given priority, takes log position into account -- see // kvflowcontrolpb.AdmittedRaftLogEntries for more details). type Handle interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority and create-time. This blocks until there are - // flow tokens available for all connected streams. - Admit(context.Context, admissionpb.WorkPriority, time.Time) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority and create-time. This blocks until there are flow tokens + // available for all connected streams. This returns true if the request was + // admitted through flow control. Ignore the first return type if err != nil. + Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error) // DeductTokensFor deducts (without blocking) flow tokens for replicating // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index b263edff077b..b6ac9307674a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -136,7 +136,7 @@ func (c *Controller) Admit( pri admissionpb.WorkPriority, _ time.Time, connection kvflowcontrol.ConnectedStream, -) error { +) (bool, error) { class := admissionpb.WorkClassFromPri(pri) c.metrics.onWaiting(class) @@ -148,12 +148,11 @@ func (c *Controller) Admit( c.mu.Unlock() tokens := b.tokens(class) - if tokens > 0 || - // In addition to letting requests through when there are tokens - // being available, we'll also let them through if we're not - // applying flow control to their specific work class. - c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass { - + // In addition to letting requests through when there are tokens + // being available, we'll also let them through if we're not + // applying flow control to their specific work class. + bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass + if tokens > 0 || bypass { if log.ExpensiveLogEnabled(ctx, 2) { log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) @@ -179,7 +178,10 @@ func (c *Controller) Admit( b.signal() // signal a waiter, if any c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + if bypass { + return false, nil + } + return true, nil } if !logged && log.ExpensiveLogEnabled(ctx, 2) { @@ -192,12 +194,12 @@ func (c *Controller) Admit( case <-b.wait(): // wait for a signal case <-connection.Disconnected(): c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + return true, nil case <-ctx.Done(): if ctx.Err() != nil { c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart)) } - return ctx.Err() + return false, ctx.Err() } } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go index 1fdcf876aefd..56f05ce9fe80 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_test.go @@ -196,7 +196,9 @@ func TestInspectController(t *testing.T) { // Set up a single connected stream, s1/t1, and ensure it shows up in the // Inspect() state. - require.NoError(t, controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1))) + admitted, err := controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1)) + require.NoError(t, err) + require.True(t, admitted) require.Len(t, controller.Inspect(ctx), 1) require.Equal(t, controller.Inspect(ctx)[0], makeInspectStream(1, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */)) @@ -211,7 +213,9 @@ func TestInspectController(t *testing.T) { // Connect another stream, s1/s2, and ensure it shows up in the Inspect() // state. - require.NoError(t, controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2))) + admitted, err = controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2)) + require.NoError(t, err) + require.True(t, admitted) require.Len(t, controller.Inspect(ctx), 2) require.Equal(t, controller.Inspect(ctx)[1], makeInspectStream(2, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */)) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 21da940bad8b..40efbcbf0d6a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -79,7 +79,9 @@ func New( var _ kvflowcontrol.Handle = &Handle{} // Admit is part of the kvflowcontrol.Handle interface. -func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error { +func (h *Handle) Admit( + ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, +) (bool, error) { if h == nil { // TODO(irfansharif): This can happen if we're proposing immediately on // a newly split off RHS that doesn't know it's a leader yet (so we @@ -92,14 +94,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim // As for cluster settings that disable flow control entirely or only // for regular traffic, that can be dealt with at the caller by not // calling .Admit() and ensuring we use the right raft entry encodings. - return nil + return false, nil } h.mu.Lock() if h.mu.closed { h.mu.Unlock() log.Errorf(ctx, "operating on a closed handle") - return nil + return false, nil } // NB: We're using a copy-on-write scheme elsewhere to maintain this slice @@ -115,15 +117,20 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim h.metrics.onWaiting(class) tstart := h.clock.PhysicalTime() + // NB: We track whether the last stream was subject to flow control, this + // helps us decide later if we should be deducting tokens for this work. + var admitted bool for _, c := range connections { - if err := h.controller.Admit(ctx, pri, ct, c); err != nil { + var err error + admitted, err = h.controller.Admit(ctx, pri, ct, c) + if err != nil { h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart)) - return err + return false, err } } h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart)) - return nil + return admitted, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index 2d0201acb203..fe9429448a64 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -110,7 +110,9 @@ func TestHandleAdmit(t *testing.T) { // the goroutine is blocked. admitCh := make(chan struct{}) go func() { - require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{})) + admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{}) + require.NoError(t, err) + require.True(t, admitted) close(admitCh) }() @@ -189,16 +191,26 @@ func TestFlowControlMode(t *testing.T) { handle.ConnectStream(ctx, pos(0), stream) handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + mode := tc.mode // copy to avoid nogo error + // Invoke .Admit() for {regular,elastic} work in a separate // goroutines, and test below whether the goroutines are blocked. regularAdmitCh := make(chan struct{}) elasticAdmitCh := make(chan struct{}) go func() { - require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{})) + admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{}) + require.NoError(t, err) + if mode == kvflowcontrol.ApplyToElastic { + require.False(t, admitted) + } else { + require.True(t, admitted) + } close(regularAdmitCh) }() go func() { - require.NoError(t, handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{})) + admitted, err := handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}) + require.NoError(t, err) + require.True(t, admitted) close(elasticAdmitCh) }() diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go index 69878f5adb15..dca22334c424 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go @@ -26,8 +26,10 @@ type Noop struct{} var _ kvflowcontrol.Handle = Noop{} // Admit is part of the kvflowcontrol.Handle interface. -func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error { - return nil +func (n Noop) Admit( + ctx context.Context, priority admissionpb.WorkPriority, time time.Time, +) (bool, error) { + return false, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 9b3503d17d7d..2f58569658b1 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{} func (t *testFlowTokenHandle) Admit( ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time, -) error { - return nil +) (bool, error) { + return false, nil } func (t *testFlowTokenHandle) DeductTokensFor(