diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 230c93970554..824861ad4437 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -203,6 +203,9 @@ func FlowVerIsCompatible( // setupFlow creates a Flow. // // Args: +// reserved: Specifies the upfront memory reservation that the flow takes +// ownership of. This account is already closed if an error is returned or +// will be closed through Flow.Cleanup. // localState: Specifies if the flow runs entirely on this node and, if it does, // specifies the txn and other attributes. // @@ -212,23 +215,17 @@ func (ds *ServerImpl) setupFlow( ctx context.Context, parentSpan *tracing.Span, parentMonitor *mon.BytesMonitor, + reserved *mon.BoundAccount, req *execinfrapb.SetupFlowRequest, rowSyncFlowConsumer execinfra.RowReceiver, batchSyncFlowConsumer execinfra.BatchReceiver, localState LocalState, ) (retCtx context.Context, _ flowinfra.Flow, _ execopnode.OpChains, retErr error) { - if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) { - err := errors.Errorf( - "version mismatch in flow request: %d; this node accepts %d through %d", - req.Version, execinfra.MinAcceptedVersion, execinfra.Version, - ) - log.Warningf(ctx, "%v", err) - return ctx, nil, nil, err - } - var sp *tracing.Span // will be Finish()ed by Flow.Cleanup() var monitor *mon.BytesMonitor // will be closed in Flow.Cleanup() - var onFlowCleanup func() + onFlowCleanup := func() { + reserved.Close(retCtx) + } // Make sure that we clean up all resources (which in the happy case are // cleaned up in Flow.Cleanup()) if an error is encountered. defer func() { @@ -239,13 +236,20 @@ func (ds *ServerImpl) setupFlow( if monitor != nil { monitor.Stop(ctx) } - if onFlowCleanup != nil { - onFlowCleanup() - } + onFlowCleanup() retCtx = tracing.ContextWithSpan(ctx, nil) } }() + if !FlowVerIsCompatible(req.Version, execinfra.MinAcceptedVersion, execinfra.Version) { + err := errors.Errorf( + "version mismatch in flow request: %d; this node accepts %d through %d", + req.Version, execinfra.MinAcceptedVersion, execinfra.Version, + ) + log.Warningf(ctx, "%v", err) + return ctx, nil, nil, err + } + const opName = "flow" if parentSpan == nil { ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName) @@ -275,7 +279,7 @@ func (ds *ServerImpl) setupFlow( noteworthyMemoryUsageBytes, ds.Settings, ) - monitor.StartNoReserved(ctx, parentMonitor) + monitor.Start(ctx, parentMonitor, reserved) makeLeaf := func() (*kv.Txn, error) { tis := req.LeafTxnInputState @@ -307,9 +311,11 @@ func (ds *ServerImpl) setupFlow( // the original state in order to avoid performance regressions. origMon := evalCtx.Mon origTxn := evalCtx.Txn + oldOnFlowCleanup := onFlowCleanup onFlowCleanup = func() { evalCtx.Mon = origMon evalCtx.Txn = origTxn + oldOnFlowCleanup() } evalCtx.Mon = monitor if localState.MustUseLeafTxn() { @@ -559,7 +565,8 @@ func (ds *ServerImpl) SetupLocalSyncFlow( localState LocalState, ) (context.Context, flowinfra.Flow, execopnode.OpChains, error) { ctx, f, opChains, err := ds.setupFlow( - ctx, tracing.SpanFromContext(ctx), parentMonitor, req, output, batchOutput, localState, + ctx, tracing.SpanFromContext(ctx), parentMonitor, &mon.BoundAccount{}, /* reserved */ + req, output, batchOutput, localState, ) if err != nil { return nil, nil, nil, err @@ -615,14 +622,24 @@ func (ds *ServerImpl) SetupFlow( // Note: the passed context will be canceled when this RPC completes, so we // can't associate it with the flow. ctx = ds.AnnotateCtx(context.Background()) - ctx, f, _, err := ds.setupFlow( - ctx, rpcSpan, ds.memMonitor, req, nil, /* rowSyncFlowConsumer */ - nil /* batchSyncFlowConsumer */, LocalState{}, - ) - if err == nil { - err = ds.flowScheduler.ScheduleFlow(ctx, f) - } - if err != nil { + if err := func() error { + // Reserve some memory for this remote flow which is a poor man's + // admission control based on the RAM usage. + reserved := ds.memMonitor.MakeBoundAccount() + err := reserved.Grow(ctx, mon.DefaultPoolAllocationSize) + if err != nil { + return err + } + var f flowinfra.Flow + ctx, f, _, err = ds.setupFlow( + ctx, rpcSpan, ds.memMonitor, &reserved, req, nil, /* rowSyncFlowConsumer */ + nil /* batchSyncFlowConsumer */, LocalState{}, + ) + if err != nil { + return err + } + return ds.flowScheduler.ScheduleFlow(ctx, f) + }(); err != nil { // We return flow deployment errors in the response so that they are // packaged correctly over the wire. If we return them directly to this // function, they become part of an rpc error.