Skip to content

Commit

Permalink
distsql: reserve 10KiB memory on admission of remote flow
Browse files Browse the repository at this point in the history
This commit makes it so that we reserve 10KiB of memory when scheduling
each remote DistSQL flow. The idea is that a read-only remote flow
(since we don't distribute writes) will need to account for some memory
anyway, so we might as well make a small reservation upfront. This would
serve as a form of admission control "based" on the RAM usage.

Release note: None
  • Loading branch information
yuzefovich committed Jul 22, 2022
1 parent 02727e3 commit 9692b54
Showing 1 changed file with 40 additions and 23 deletions.
63 changes: 40 additions & 23 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func FlowVerIsCompatible(
// setupFlow creates a Flow.
//
// Args:
// reserved: Specifies the upfront memory reservation that the flow takes
// ownership of. This account is already closed if an error is returned or
// will be closed through Flow.Cleanup.
// localState: Specifies if the flow runs entirely on this node and, if it does,
// specifies the txn and other attributes.
//
Expand All @@ -212,23 +215,17 @@ func (ds *ServerImpl) setupFlow(
ctx context.Context,
parentSpan *tracing.Span,
parentMonitor *mon.BytesMonitor,
reserved *mon.BoundAccount,
req *execinfrapb.SetupFlowRequest,
rowSyncFlowConsumer execinfra.RowReceiver,
batchSyncFlowConsumer execinfra.BatchReceiver,
localState LocalState,
) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) {
if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
req.Version, execinfra.MinAcceptedVersion, execinfra.Version,
)
log.Warningf(ctx, "%v", err)
return ctx, nil, nil, err
}

var sp *tracing.Span // will be Finish()ed by Flow.Cleanup()
var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup()
var onFlowCleanup func()
onFlowCleanup := func() {
reserved.Close(retCtx)
}
// Make sure that we clean up all resources (which in the happy case are
// cleaned up in Flow.Cleanup()) if an error is encountered.
defer func() {
Expand All @@ -239,13 +236,20 @@ func (ds *ServerImpl) setupFlow(
if monitor != nil {
monitor.Stop(ctx)
}
if onFlowCleanup != nil {
onFlowCleanup()
}
onFlowCleanup()
retCtx = tracing.ContextWithSpan(ctx, nil)
}
}()

if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) {
err := errors.Errorf(
"version mismatch in flow request: %d; this node accepts %d through %d",
req.Version, execinfra.MinAcceptedVersion, execinfra.Version,
)
log.Warningf(ctx, "%v", err)
return ctx, nil, nil, err
}

const opName = "flow"
if parentSpan == nil {
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName)
Expand Down Expand Up @@ -275,7 +279,7 @@ func (ds *ServerImpl) setupFlow(
noteworthyMemoryUsageBytes,
ds.Settings,
)
monitor.StartNoReserved(ctx, parentMonitor)
monitor.Start(ctx, parentMonitor, reserved)

makeLeaf := func() (*kv.Txn, error) {
tis := req.LeafTxnInputState
Expand Down Expand Up @@ -307,9 +311,11 @@ func (ds *ServerImpl) setupFlow(
// the original state in order to avoid performance regressions.
origMon := evalCtx.Mon
origTxn := evalCtx.Txn
oldOnFlowCleanup := onFlowCleanup
onFlowCleanup = func() {
evalCtx.Mon = origMon
evalCtx.Txn = origTxn
oldOnFlowCleanup()
}
evalCtx.Mon = monitor
if localState.MustUseLeafTxn() {
Expand Down Expand Up @@ -559,7 +565,8 @@ func (ds *ServerImpl) SetupLocalSyncFlow(
localState LocalState,
) (context.Context, flowinfra.Flow, execopnode.OpChains, error) {
ctx, f, opChains, err := ds.setupFlow(
ctx, tracing.SpanFromContext(ctx), parentMonitor, req, output, batchOutput, localState,
ctx, tracing.SpanFromContext(ctx), parentMonitor, &mon.BoundAccount{}, /* reserved */
req, output, batchOutput, localState,
)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -615,14 +622,24 @@ func (ds *ServerImpl) SetupFlow(
// Note: the passed context will be canceled when this RPC completes, so we
// can't associate it with the flow.
ctx = ds.AnnotateCtx(context.Background())
ctx, f, _, err := ds.setupFlow(
ctx, rpcSpan, ds.memMonitor, req, nil, /* rowSyncFlowConsumer */
nil /* batchSyncFlowConsumer */, LocalState{},
)
if err == nil {
err = ds.flowScheduler.ScheduleFlow(ctx, f)
}
if err != nil {
if err := func() error {
// Reserve some memory for this remote flow which is a poor man's
// admission control based on the RAM usage.
reserved := ds.memMonitor.MakeBoundAccount()
err := reserved.Grow(ctx, mon.DefaultPoolAllocationSize)
if err != nil {
return err
}
var f flowinfra.Flow
ctx, f, _, err = ds.setupFlow(
ctx, rpcSpan, ds.memMonitor, &reserved, req, nil, /* rowSyncFlowConsumer */
nil /* batchSyncFlowConsumer */, LocalState{},
)
if err != nil {
return err
}
return ds.flowScheduler.ScheduleFlow(ctx, f)
}(); err != nil {
// We return flow deployment errors in the response so that they are
// packaged correctly over the wire. If we return them directly to this
// function, they become part of an rpc error.
Expand Down

0 comments on commit 9692b54

Please sign in to comment.