Skip to content

Commit

Permalink
kvadmission: fix handling of non-elastic work with flow control
Browse files Browse the repository at this point in the history
Previously, when we were skipping flow control for regular work, we were
still encoding raft messages with AC encoding. This would enqueue
essentially no-op AC work items below raft, increasing the possibility
of OOM. This is because we would not be pacing the rate of regular work
by below-raft admission rates but it would be consuming memory in
below-raft work queues.

This change skips encoding raft messages with AC encoding for cases
where we bypass flow control. Additionally, we still subject such work
to above-raft IO admission control, if enabled.

Informs cockroachdb#104154.

Release note: None
  • Loading branch information
aadityasondhi committed Aug 30, 2023
1 parent 36bd829 commit 9789da7
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 57 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/flow_control_replica_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ func newMockFlowHandle(

func (m *mockFlowHandle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) error {
) (bool, error) {
m.t.Fatal("unimplemented")
return nil
return false, nil
}

func (m *mockFlowHandle) DeductTokensFor(
Expand Down
38 changes: 23 additions & 15 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,27 +310,35 @@ func (n *controllerImpl) AdmitKVWork(
// to continue even when throttling since there are often significant
// number of tokens available.
if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
if !bypassAdmission &&
kvflowcontrol.Enabled.Get(&n.settings.SV) &&
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) {
var admitted bool
attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV) &&
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings)
if attemptFlowControl && !bypassAdmission {
kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID)
if !found {
return Handle{}, nil
}
if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil {
var err error
admitted, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime))
if err != nil {
return Handle{}, err
} else if admitted {
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
// deduct tokens from, if the range experiences a split between now
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
}
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
// deduct tokens from, if the range experiences a split between now
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
} else {

}
// If flow control is disabled or if work bypasses flow control, we still
// subject it above-raft, leaseholder-only IO admission control.
if !attemptFlowControl || !admitted {
storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
if storeAdmissionQ != nil {
storeWorkHandle, err := storeAdmissionQ.Admit(
Expand Down
26 changes: 11 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ const (
// virtually enqueued in below-raft admission queues and dequeued in
// priority order, but only empty elastic flow token buckets above-raft will
// block further elastic traffic from being admitted.
//
// TODO(irfansharif): We're potentially risking OOMs doing all this tracking
// for regular work, without coalescing state. With a bit of plumbing, for
// requests that bypass flow control we could fallback to using the non-AC
// raft encodings and avoid the potential OOMs. Address this as part of
// #95563.
ApplyToElastic ModeT = iota
// ApplyToAll uses flow control for both elastic and regular traffic,
// i.e. all work will wait for flow tokens to be available.
Expand Down Expand Up @@ -117,11 +111,12 @@ type Tokens int64
// Controller provides flow control for replication traffic in KV, held at the
// node-level.
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority, create-time, and over the given stream. This blocks
// until there are flow tokens available or the stream disconnects, subject to
// context cancellation. This returns true if the request was admitted through
// flow control. Ignore the first return type if err != nil
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (admitted bool, _ error)
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
Expand Down Expand Up @@ -158,10 +153,11 @@ type Controller interface {
// given priority, takes log position into account -- see
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority and create-time. This blocks until there are flow tokens
// available for all connected streams. This returns true if the request was
// admitted through flow control. Ignore the first return type if err != nil.
Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error)
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
Expand Down
22 changes: 12 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *Controller) Admit(
pri admissionpb.WorkPriority,
_ time.Time,
connection kvflowcontrol.ConnectedStream,
) error {
) (bool, error) {
class := admissionpb.WorkClassFromPri(pri)
c.metrics.onWaiting(class)

Expand All @@ -148,12 +148,11 @@ func (c *Controller) Admit(
c.mu.Unlock()

tokens := b.tokens(class)
if tokens > 0 ||
// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass {

// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass
if tokens > 0 || bypass {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode())
Expand All @@ -179,7 +178,10 @@ func (c *Controller) Admit(

b.signal() // signal a waiter, if any
c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart))
return nil
if bypass {
return false, nil
}
return true, nil
}

if !logged && log.ExpensiveLogEnabled(ctx, 2) {
Expand All @@ -192,12 +194,12 @@ func (c *Controller) Admit(
case <-b.wait(): // wait for a signal
case <-connection.Disconnected():
c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart))
return nil
return true, nil
case <-ctx.Done():
if ctx.Err() != nil {
c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart))
}
return ctx.Err()
return false, ctx.Err()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func TestInspectController(t *testing.T) {

// Set up a single connected stream, s1/t1, and ensure it shows up in the
// Inspect() state.
require.NoError(t, controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1)))
admitted, err := controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1))
require.NoError(t, err)
require.True(t, admitted)
require.Len(t, controller.Inspect(ctx), 1)
require.Equal(t, controller.Inspect(ctx)[0],
makeInspectStream(1, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))
Expand All @@ -211,7 +213,9 @@ func TestInspectController(t *testing.T) {

// Connect another stream, s1/s2, and ensure it shows up in the Inspect()
// state.
require.NoError(t, controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2)))
admitted, err = controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2))
require.NoError(t, err)
require.True(t, admitted)
require.Len(t, controller.Inspect(ctx), 2)
require.Equal(t, controller.Inspect(ctx)[1],
makeInspectStream(2, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))
Expand Down
19 changes: 13 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func New(
var _ kvflowcontrol.Handle = &Handle{}

// Admit is part of the kvflowcontrol.Handle interface.
func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error {
func (h *Handle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) (bool, error) {
if h == nil {
// TODO(irfansharif): This can happen if we're proposing immediately on
// a newly split off RHS that doesn't know it's a leader yet (so we
Expand All @@ -92,14 +94,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
// As for cluster settings that disable flow control entirely or only
// for regular traffic, that can be dealt with at the caller by not
// calling .Admit() and ensuring we use the right raft entry encodings.
return nil
return false, nil
}

h.mu.Lock()
if h.mu.closed {
h.mu.Unlock()
log.Errorf(ctx, "operating on a closed handle")
return nil
return false, nil
}

// NB: We're using a copy-on-write scheme elsewhere to maintain this slice
Expand All @@ -115,15 +117,20 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
h.metrics.onWaiting(class)
tstart := h.clock.PhysicalTime()

// NB: We track whether the last stream was subject to flow control, this
// helps us decide later if we should be deducting tokens for this work.
var admitted bool
for _, c := range connections {
if err := h.controller.Admit(ctx, pri, ct, c); err != nil {
var err error
admitted, err = h.controller.Admit(ctx, pri, ct, c)
if err != nil {
h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart))
return err
return false, err
}
}

h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart))
return nil
return admitted, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func TestHandleAdmit(t *testing.T) {
// the goroutine is blocked.
admitCh := make(chan struct{})
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
require.NoError(t, err)
require.True(t, admitted)
close(admitCh)
}()

Expand Down Expand Up @@ -189,16 +191,26 @@ func TestFlowControlMode(t *testing.T) {
handle.ConnectStream(ctx, pos(0), stream)
handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))

mode := tc.mode // copy to avoid nogo error

// Invoke .Admit() for {regular,elastic} work in a separate
// goroutines, and test below whether the goroutines are blocked.
regularAdmitCh := make(chan struct{})
elasticAdmitCh := make(chan struct{})
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
require.NoError(t, err)
if mode == kvflowcontrol.ApplyToElastic {
require.False(t, admitted)
} else {
require.True(t, admitted)
}
close(regularAdmitCh)
}()
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{})
require.NoError(t, err)
require.True(t, admitted)
close(elasticAdmitCh)
}()

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Noop struct{}
var _ kvflowcontrol.Handle = Noop{}

// Admit is part of the kvflowcontrol.Handle interface.
func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error {
return nil
func (n Noop) Admit(
ctx context.Context, priority admissionpb.WorkPriority, time time.Time,
) (bool, error) {
return false, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{}

func (t *testFlowTokenHandle) Admit(
ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time,
) error {
return nil
) (bool, error) {
return false, nil
}

func (t *testFlowTokenHandle) DeductTokensFor(
Expand Down

0 comments on commit 9789da7

Please sign in to comment.