Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvadmission: fix handling of non-elastic work with flow control #109446

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/flow_control_replica_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ func newMockFlowHandle(

func (m *mockFlowHandle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) error {
) (bool, error) {
m.t.Fatal("unimplemented")
return nil
return false, nil
}

func (m *mockFlowHandle) DeductTokensFor(
Expand Down
42 changes: 27 additions & 15 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,29 +310,41 @@ func (n *controllerImpl) AdmitKVWork(
// to continue even when throttling since there are often significant
// number of tokens available.
if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
if !bypassAdmission &&
kvflowcontrol.Enabled.Get(&n.settings.SV) &&
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) {
var admitted bool
attemptFlowControl := kvflowcontrol.Enabled.Get(&n.settings.SV) &&
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings)
if attemptFlowControl && !bypassAdmission {
kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID)
if !found {
return Handle{}, nil
}
if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil {
var err error
admitted, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime))
if err != nil {
return Handle{}, err
} else if admitted {
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
// deduct tokens from, if the range experiences a split between now
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
}
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
// deduct tokens from, if the range experiences a split between now
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
} else {

}
// If flow control is disabled or if work bypasses flow control, we still
// subject it above-raft, leaseholder-only IO admission control.
if !attemptFlowControl || !admitted {
storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
if storeAdmissionQ != nil {
// NB: Even though we would know here we're bypassing admission (via
// `bypassAdmission`), we still have to explicitly invoke `.Admit()`.
// We do it for correct token accounting (i.e. we deduct tokens without
// blocking).
storeWorkHandle, err := storeAdmissionQ.Admit(
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
if err != nil {
Expand Down
26 changes: 11 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ const (
// virtually enqueued in below-raft admission queues and dequeued in
// priority order, but only empty elastic flow token buckets above-raft will
// block further elastic traffic from being admitted.
//
// TODO(irfansharif): We're potentially risking OOMs doing all this tracking
// for regular work, without coalescing state. With a bit of plumbing, for
// requests that bypass flow control we could fallback to using the non-AC
// raft encodings and avoid the potential OOMs. Address this as part of
// #95563.
ApplyToElastic ModeT = iota
// ApplyToAll uses flow control for both elastic and regular traffic,
// i.e. all work will wait for flow tokens to be available.
Expand Down Expand Up @@ -117,11 +111,12 @@ type Tokens int64
// Controller provides flow control for replication traffic in KV, held at the
// node-level.
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority, create-time, and over the given stream. This blocks
// until there are flow tokens available or the stream disconnects, subject to
// context cancellation. This returns true if the request was admitted through
// flow control. Ignore the first return type if err != nil
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (admitted bool, _ error)
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
Expand Down Expand Up @@ -158,10 +153,11 @@ type Controller interface {
// given priority, takes log position into account -- see
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority and create-time. This blocks until there are flow tokens
// available for all connected streams. This returns true if the request was
// admitted through flow control. Ignore the first return type if err != nil.
Admit(context.Context, admissionpb.WorkPriority, time.Time) (admitted bool, _ error)
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *Controller) Admit(
pri admissionpb.WorkPriority,
_ time.Time,
connection kvflowcontrol.ConnectedStream,
) error {
) (bool, error) {
class := admissionpb.WorkClassFromPri(pri)
c.metrics.onWaiting(class)

Expand All @@ -148,12 +148,11 @@ func (c *Controller) Admit(
c.mu.Unlock()

tokens := b.tokens(class)
if tokens > 0 ||
// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass {

// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass
if tokens > 0 || bypass {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode())
Expand All @@ -179,7 +178,10 @@ func (c *Controller) Admit(

b.signal() // signal a waiter, if any
c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart))
return nil
if bypass {
return false, nil
}
return true, nil
}

if !logged && log.ExpensiveLogEnabled(ctx, 2) {
Expand All @@ -192,12 +194,12 @@ func (c *Controller) Admit(
case <-b.wait(): // wait for a signal
case <-connection.Disconnected():
c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart))
return nil
return true, nil
case <-ctx.Done():
if ctx.Err() != nil {
c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart))
}
return ctx.Err()
return false, ctx.Err()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func TestInspectController(t *testing.T) {

// Set up a single connected stream, s1/t1, and ensure it shows up in the
// Inspect() state.
require.NoError(t, controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1)))
admitted, err := controller.Admit(ctx, admissionpb.NormalPri, time.Time{}, makeConnectedStream(1))
require.NoError(t, err)
require.True(t, admitted)
require.Len(t, controller.Inspect(ctx), 1)
require.Equal(t, controller.Inspect(ctx)[0],
makeInspectStream(1, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))
Expand All @@ -211,7 +213,9 @@ func TestInspectController(t *testing.T) {

// Connect another stream, s1/s2, and ensure it shows up in the Inspect()
// state.
require.NoError(t, controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2)))
admitted, err = controller.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}, makeConnectedStream(2))
require.NoError(t, err)
require.True(t, admitted)
require.Len(t, controller.Inspect(ctx), 2)
require.Equal(t, controller.Inspect(ctx)[1],
makeInspectStream(2, 8<<20 /* 8MiB */, 16<<20 /* 16 MiB */))
Expand Down
19 changes: 13 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func New(
var _ kvflowcontrol.Handle = &Handle{}

// Admit is part of the kvflowcontrol.Handle interface.
func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error {
func (h *Handle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) (bool, error) {
if h == nil {
// TODO(irfansharif): This can happen if we're proposing immediately on
// a newly split off RHS that doesn't know it's a leader yet (so we
Expand All @@ -92,14 +94,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
// As for cluster settings that disable flow control entirely or only
// for regular traffic, that can be dealt with at the caller by not
// calling .Admit() and ensuring we use the right raft entry encodings.
return nil
return false, nil
}

h.mu.Lock()
if h.mu.closed {
h.mu.Unlock()
log.Errorf(ctx, "operating on a closed handle")
return nil
return false, nil
}

// NB: We're using a copy-on-write scheme elsewhere to maintain this slice
Expand All @@ -115,15 +117,20 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
h.metrics.onWaiting(class)
tstart := h.clock.PhysicalTime()

// NB: We track whether the last stream was subject to flow control, this
// helps us decide later if we should be deducting tokens for this work.
var admitted bool
for _, c := range connections {
if err := h.controller.Admit(ctx, pri, ct, c); err != nil {
var err error
admitted, err = h.controller.Admit(ctx, pri, ct, c)
if err != nil {
h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart))
return err
return false, err
}
}

h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart))
return nil
return admitted, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func TestHandleAdmit(t *testing.T) {
// the goroutine is blocked.
admitCh := make(chan struct{})
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
require.NoError(t, err)
require.True(t, admitted)
close(admitCh)
}()

Expand Down Expand Up @@ -189,16 +191,26 @@ func TestFlowControlMode(t *testing.T) {
handle.ConnectStream(ctx, pos(0), stream)
handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */))

mode := tc.mode // copy to avoid nogo error

// Invoke .Admit() for {regular,elastic} work in a separate
// goroutines, and test below whether the goroutines are blocked.
regularAdmitCh := make(chan struct{})
elasticAdmitCh := make(chan struct{})
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.NormalPri, time.Time{})
require.NoError(t, err)
if mode == kvflowcontrol.ApplyToElastic {
require.False(t, admitted)
} else {
require.True(t, admitted)
}
close(regularAdmitCh)
}()
go func() {
require.NoError(t, handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{}))
admitted, err := handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{})
require.NoError(t, err)
require.True(t, admitted)
close(elasticAdmitCh)
}()

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Noop struct{}
var _ kvflowcontrol.Handle = Noop{}

// Admit is part of the kvflowcontrol.Handle interface.
func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error {
return nil
func (n Noop) Admit(
ctx context.Context, priority admissionpb.WorkPriority, time time.Time,
) (bool, error) {
return false, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{}

func (t *testFlowTokenHandle) Admit(
ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time,
) error {
return nil
) (bool, error) {
return false, nil
}

func (t *testFlowTokenHandle) DeductTokensFor(
Expand Down