diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 884111c790e7..f5be6adb6598 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -63,12 +63,14 @@ func checkNumIn(inputs []colexecargs.OpWithMetaInfo, numIn int) error { // execution flow and returns toWrap's output as an Operator. // - materializerSafeToRelease indicates whether the materializers created in // order to row-sourcify the inputs are safe to be released on the flow cleanup. +// - streamingMemAccFactory must be non-nil, and in the production setting must +// return separate accounts on each invocation. func wrapRowSources( ctx context.Context, flowCtx *execinfra.FlowCtx, inputs []colexecargs.OpWithMetaInfo, inputTypes [][]*types.T, - streamingMemAccount *mon.BoundAccount, + streamingMemAccFactory func() *mon.BoundAccount, processorID int32, newToWrap func([]execinfra.RowSource) (execinfra.RowSource, error), materializerSafeToRelease bool, @@ -88,6 +90,7 @@ func wrapRowSources( toWrapInputs = append(toWrapInputs, c.Input()) } else { toWrapInput := colexec.NewMaterializer( + colmem.NewAllocator(ctx, streamingMemAccFactory(), factory), flowCtx, processorID, inputs[i], @@ -119,11 +122,11 @@ func wrapRowSources( var c *colexec.Columnarizer if proc.MustBeStreaming() { c = colexec.NewStreamingColumnarizer( - colmem.NewAllocator(ctx, streamingMemAccount, factory), flowCtx, processorID, toWrap, + colmem.NewAllocator(ctx, streamingMemAccFactory(), factory), flowCtx, processorID, toWrap, ) } else { c = colexec.NewBufferingColumnarizer( - colmem.NewAllocator(ctx, streamingMemAccount, factory), flowCtx, processorID, toWrap, + colmem.NewAllocator(ctx, streamingMemAccFactory(), factory), flowCtx, processorID, toWrap, ) } return c, releasables, nil @@ -538,12 +541,18 @@ func (r opResult) createAndWrapRowSource( // LocalPlanNode cores which is the case when we have non-empty // LocalProcessors. materializerSafeToRelease := len(args.LocalProcessors) == 0 + streamingMemAccFactory := args.StreamingMemAccFactory + if streamingMemAccFactory == nil { + streamingMemAccFactory = func() *mon.BoundAccount { + return args.StreamingMemAccount + } + } c, releasables, err := wrapRowSources( ctx, flowCtx, inputs, inputTypes, - args.StreamingMemAccount, + streamingMemAccFactory, processorID, func(inputs []execinfra.RowSource) (execinfra.RowSource, error) { // We provide a slice with a single nil as 'outputs' parameter diff --git a/pkg/sql/colexec/colbuilder/execplan_test.go b/pkg/sql/colexec/colbuilder/execplan_test.go index c8d9c09ab8c8..28582acba521 100644 --- a/pkg/sql/colexec/colbuilder/execplan_test.go +++ b/pkg/sql/colexec/colbuilder/execplan_test.go @@ -137,6 +137,7 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) { defer r.TestCleanupNoError(t) m := colexec.NewMaterializer( + nil, /* allocator */ flowCtx, 0, /* processorID */ r.OpWithMetaInfo, diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 41301acf789c..64403f793bdd 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -54,17 +54,18 @@ type OpWithMetaInfo struct { // NewColOperatorArgs is a helper struct that encompasses all of the input // arguments to NewColOperator call. type NewColOperatorArgs struct { - Spec *execinfrapb.ProcessorSpec - Inputs []OpWithMetaInfo - StreamingMemAccount *mon.BoundAccount - ProcessorConstructor execinfra.ProcessorConstructor - LocalProcessors []execinfra.LocalProcessor - DiskQueueCfg colcontainer.DiskQueueCfg - FDSemaphore semaphore.Semaphore - ExprHelper *ExprHelper - Factory coldata.ColumnFactory - MonitorRegistry *MonitorRegistry - TestingKnobs struct { + Spec *execinfrapb.ProcessorSpec + Inputs []OpWithMetaInfo + StreamingMemAccount *mon.BoundAccount + StreamingMemAccFactory func() *mon.BoundAccount + ProcessorConstructor execinfra.ProcessorConstructor + LocalProcessors []execinfra.LocalProcessor + DiskQueueCfg colcontainer.DiskQueueCfg + FDSemaphore semaphore.Semaphore + ExprHelper *ExprHelper + Factory coldata.ColumnFactory + MonitorRegistry *MonitorRegistry + TestingKnobs struct { // SpillingCallbackFn will be called when the spilling from an in-memory // to disk-backed operator occurs. It should only be set in tests. SpillingCallbackFn func() diff --git a/pkg/sql/colexec/colexecbase/simple_project_test.go b/pkg/sql/colexec/colexecbase/simple_project_test.go index 45226711bc3c..7da4e9684966 100644 --- a/pkg/sql/colexec/colexecbase/simple_project_test.go +++ b/pkg/sql/colexec/colexecbase/simple_project_test.go @@ -121,7 +121,7 @@ func TestSimpleProjectOpWithUnorderedSynchronizer(t *testing.T) { for i := range parallelUnorderedSynchronizerInputs { parallelUnorderedSynchronizerInputs[i].Root = inputs[i] } - input = colexec.NewParallelUnorderedSynchronizer(parallelUnorderedSynchronizerInputs, &wg) + input = colexec.NewParallelUnorderedSynchronizer(testAllocator, parallelUnorderedSynchronizerInputs, &wg) input = colexecbase.NewSimpleProjectOp(input, len(inputTypes), []uint32{0}) return colexecbase.NewConstOp(testAllocator, input, types.Int, constVal, 1) }) diff --git a/pkg/sql/colexec/colexecutils/BUILD.bazel b/pkg/sql/colexec/colexecutils/BUILD.bazel index 336d42e3519c..3bd9ed7ebaf5 100644 --- a/pkg/sql/colexec/colexecutils/BUILD.bazel +++ b/pkg/sql/colexec/colexecutils/BUILD.bazel @@ -18,10 +18,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/col/coldata", + "//pkg/roachpb", "//pkg/sql/colcontainer", "//pkg/sql/colexecerror", "//pkg/sql/colexecop", "//pkg/sql/colmem", + "//pkg/sql/execinfrapb", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sqlerrors", diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index 8d033f0023d3..7d2a1b7ca65c 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -12,8 +12,10 @@ package colexecutils import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" @@ -318,3 +320,15 @@ func init() { DefaultSelectionVector[i] = i } } + +// AccountForMetadata registers the memory footprint of meta with the allocator. +func AccountForMetadata(allocator *colmem.Allocator, meta []execinfrapb.ProducerMetadata) { + for i := range meta { + // Perform the memory accounting for the LeafTxnFinalState metadata + // since it might be of non-trivial size. + if ltfs := meta[i].LeafTxnFinalState; ltfs != nil { + memUsage := roachpb.Spans(ltfs.RefreshSpans).MemUsage() + allocator.AdjustMemoryUsage(memUsage) + } + } +} diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 9042e57f6a71..954d88b61d73 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -77,6 +78,7 @@ var _ colexecop.VectorizedStatsCollector = &Columnarizer{} // NewBufferingColumnarizer returns a new Columnarizer that will be buffering up // rows before emitting them as output batches. +// - allocator must use a memory account that is not shared with any other user. func NewBufferingColumnarizer( allocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, @@ -88,6 +90,7 @@ func NewBufferingColumnarizer( // NewStreamingColumnarizer returns a new Columnarizer that emits every input // row as a separate batch. +// - allocator must use a memory account that is not shared with any other user. func NewStreamingColumnarizer( allocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, @@ -233,6 +236,7 @@ func (c *Columnarizer) Next() coldata.Batch { colexecerror.ExpectedError(meta.Err) } c.accumulatedMeta = append(c.accumulatedMeta, *meta) + colexecutils.AccountForMetadata(c.allocator, c.accumulatedMeta[len(c.accumulatedMeta)-1:]) continue } if row == nil { @@ -266,12 +270,22 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { if c.removedFromFlow { return nil } + // We no longer need the batch. + c.batch = nil + bufferedMeta := c.accumulatedMeta + // Eagerly lose the reference to the metadata since it might be of + // non-trivial footprint. + c.accumulatedMeta = nil + // When this method returns, we no longer will have the reference to the + // metadata (nor to the batch), so we can release all memory from the + // allocator. + defer c.allocator.ReleaseAll() if c.Ctx == nil { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was // encountered in Init. - return c.accumulatedMeta + return bufferedMeta } c.MoveToDraining(nil /* err */) for { @@ -279,9 +293,9 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { if meta == nil { break } - c.accumulatedMeta = append(c.accumulatedMeta, *meta) + bufferedMeta = append(bufferedMeta, *meta) } - return c.accumulatedMeta + return bufferedMeta } // Close is part of the colexecop.ClosableOperator interface. diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 5844e7534b7d..15eadf29d06c 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -17,8 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colconv" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" @@ -73,10 +75,14 @@ type drainHelper struct { // are noops. ctx context.Context + // allocator can be nil in tests. + allocator *colmem.Allocator + statsCollectors []colexecop.VectorizedStatsCollector sources colexecop.MetadataSources - bufferedMeta []execinfrapb.ProducerMetadata + drained bool + meta []execinfrapb.ProducerMetadata } var _ execinfra.RowSource = &drainHelper{} @@ -113,18 +119,25 @@ func (d *drainHelper) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) } d.statsCollectors = nil } - if d.bufferedMeta == nil { - d.bufferedMeta = d.sources.DrainMeta() - if d.bufferedMeta == nil { - // Still nil, avoid more calls to DrainMeta. - d.bufferedMeta = []execinfrapb.ProducerMetadata{} + if !d.drained { + d.meta = d.sources.DrainMeta() + d.drained = true + if d.allocator != nil { + colexecutils.AccountForMetadata(d.allocator, d.meta) } } - if len(d.bufferedMeta) == 0 { + if len(d.meta) == 0 { + // Eagerly lose the reference to the slice. + d.meta = nil + if d.allocator != nil { + // At this point, the caller took over all of the metadata, so we + // can release all of the allocations. + d.allocator.ReleaseAll() + } return nil, nil } - meta := d.bufferedMeta[0] - d.bufferedMeta = d.bufferedMeta[1:] + meta := d.meta[0] + d.meta = d.meta[1:] return nil, &meta } @@ -143,18 +156,22 @@ var materializerPool = sync.Pool{ // NewMaterializer creates a new Materializer processor which processes the // columnar data coming from input to return it as rows. // Arguments: +// - allocator must use a memory account that is not shared with any other user, +// can be nil in tests. // - typs is the output types schema. Typs are assumed to have been hydrated. -// - getStats (when tracing is enabled) returns all of the execution statistics -// of operators which the materializer is responsible for. // NOTE: the constructor does *not* take in an execinfrapb.PostProcessSpec // because we expect input to handle that for us. func NewMaterializer( - flowCtx *execinfra.FlowCtx, processorID int32, input colexecargs.OpWithMetaInfo, typs []*types.T, + allocator *colmem.Allocator, + flowCtx *execinfra.FlowCtx, + processorID int32, + input colexecargs.OpWithMetaInfo, + typs []*types.T, ) *Materializer { // When the materializer is created in the middle of the chain of operators, // it will modify the eval context when it is done draining, so we have to // give it a copy to preserve the "global" eval context from being mutated. - return newMaterializerInternal(flowCtx, flowCtx.NewEvalCtx(), processorID, input, typs) + return newMaterializerInternal(allocator, flowCtx, flowCtx.NewEvalCtx(), processorID, input, typs) } // NewMaterializerNoEvalCtxCopy is the same as NewMaterializer but doesn't make @@ -166,12 +183,17 @@ func NewMaterializer( // modifies the eval context) only when the whole flow is done, at which point // the eval context won't be used anymore. func NewMaterializerNoEvalCtxCopy( - flowCtx *execinfra.FlowCtx, processorID int32, input colexecargs.OpWithMetaInfo, typs []*types.T, + allocator *colmem.Allocator, + flowCtx *execinfra.FlowCtx, + processorID int32, + input colexecargs.OpWithMetaInfo, + typs []*types.T, ) *Materializer { - return newMaterializerInternal(flowCtx, flowCtx.EvalCtx, processorID, input, typs) + return newMaterializerInternal(allocator, flowCtx, flowCtx.EvalCtx, processorID, input, typs) } func newMaterializerInternal( + allocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, evalCtx *eval.Context, processorID int32, @@ -192,6 +214,7 @@ func newMaterializerInternal( // rowSourceToPlanNode wrappers. closers: append(m.closers[:0], input.ToClose...), } + m.drainHelper.allocator = allocator m.drainHelper.statsCollectors = input.StatsCollectors m.drainHelper.sources = input.MetadataSources diff --git a/pkg/sql/colexec/materializer_test.go b/pkg/sql/colexec/materializer_test.go index d7c898ef03ce..94655a12f32d 100644 --- a/pkg/sql/colexec/materializer_test.go +++ b/pkg/sql/colexec/materializer_test.go @@ -60,6 +60,7 @@ func TestColumnarizeMaterialize(t *testing.T) { c := NewBufferingColumnarizer(testAllocator, flowCtx, 0, input) m := NewMaterializer( + nil, /* allocator */ flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: c}, @@ -137,6 +138,7 @@ func BenchmarkMaterializer(b *testing.B) { b.SetBytes(int64(nRows * nCols * int(memsize.Int64))) for i := 0; i < b.N; i++ { m := NewMaterializer( + nil, /* allocator */ flowCtx, 0, /* processorID */ colexecargs.OpWithMetaInfo{Root: input}, @@ -184,6 +186,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) { } m := NewMaterializer( + nil, /* allocator */ flowCtx, 0, /* processorID */ colexecargs.OpWithMetaInfo{ @@ -227,6 +230,7 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) { b.SetBytes(int64(nRows * nCols * int(memsize.Int64))) for i := 0; i < b.N; i++ { m := NewMaterializer( + nil, /* allocator */ flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: c}, diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 0efa2fda969f..f40a6c2d2b71 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -18,8 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -64,6 +66,7 @@ const ( type ParallelUnorderedSynchronizer struct { colexecop.InitHelper + allocator *colmem.Allocator inputs []colexecargs.OpWithMetaInfo inputCtxs []context.Context // cancelLocalInput stores context cancellation functions for each of the @@ -129,8 +132,9 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode. // increment the passed-in WaitGroup and decrement when done. It is also // guaranteed that these spawned goroutines will have completed on any error or // zero-length batch received from Next. +// - allocator must use a memory account that is not shared with any other user. func NewParallelUnorderedSynchronizer( - inputs []colexecargs.OpWithMetaInfo, wg *sync.WaitGroup, + allocator *colmem.Allocator, inputs []colexecargs.OpWithMetaInfo, wg *sync.WaitGroup, ) *ParallelUnorderedSynchronizer { readNextBatch := make([]chan struct{}, len(inputs)) for i := range readNextBatch { @@ -139,6 +143,7 @@ func NewParallelUnorderedSynchronizer( readNextBatch[i] = make(chan struct{}, 1) } return &ParallelUnorderedSynchronizer{ + allocator: allocator, inputs: inputs, inputCtxs: make([]context.Context, len(inputs)), cancelLocalInput: make([]context.CancelFunc, len(inputs)), @@ -379,6 +384,7 @@ func (s *ParallelUnorderedSynchronizer) Next() coldata.Batch { } s.lastReadInputIdx = msg.inputIdx if msg.meta != nil { + colexecutils.AccountForMetadata(s.allocator, msg.meta) s.bufferedMeta = append(s.bufferedMeta, msg.meta...) continue } @@ -466,7 +472,14 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada // Done. s.setState(parallelUnorderedSynchronizerStateDone) - return s.bufferedMeta + bufferedMeta := s.bufferedMeta + // Eagerly lose the reference to the metadata since it might be of + // non-trivial footprint. + s.bufferedMeta = nil + // The caller takes ownership of the metadata, so we can release all of the + // allocations. + s.allocator.ReleaseAll() + return bufferedMeta } // Close is part of the colexecop.ClosableOperator interface. diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index b7c3c4e676ac..fdc9d98a7621 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -126,7 +126,7 @@ func TestParallelUnorderedSynchronizer(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) var wg sync.WaitGroup - s := NewParallelUnorderedSynchronizer(inputs, &wg) + s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) s.LocalPlan = true s.Init(ctx) @@ -228,7 +228,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) { } var wg sync.WaitGroup - s := NewParallelUnorderedSynchronizer(inputs, &wg) + s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) s.Init(ctx) for { if err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next() }); err != nil { @@ -280,7 +280,7 @@ func TestParallelUnorderedSyncClosesInputs(t *testing.T) { // Create and initialize (but don't run) the synchronizer. var wg sync.WaitGroup - s := NewParallelUnorderedSynchronizer(inputs, &wg) + s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) err := colexecerror.CatchVectorizedRuntimeError(func() { s.Init(ctx) }) require.NotNil(t, err) require.True(t, strings.Contains(err.Error(), injectedPanicMsg)) @@ -304,7 +304,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) { } var wg sync.WaitGroup ctx, cancelFn := context.WithCancel(context.Background()) - s := NewParallelUnorderedSynchronizer(inputs, &wg) + s := NewParallelUnorderedSynchronizer(testAllocator, inputs, &wg) s.Init(ctx) b.SetBytes(8 * int64(coldata.BatchSize())) b.ResetTimer() diff --git a/pkg/sql/colexec/proj_utils_test.go b/pkg/sql/colexec/proj_utils_test.go index c168d959dd87..ca18c8ba74e3 100644 --- a/pkg/sql/colexec/proj_utils_test.go +++ b/pkg/sql/colexec/proj_utils_test.go @@ -76,6 +76,7 @@ func assertProjOpAgainstRowByRow( // column of the projection operator. op := colexecbase.NewSimpleProjectOp(projOp, len(inputTypes)+1, []uint32{uint32(len(inputTypes))}) materializer := NewMaterializer( + nil, /* allocator */ flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: op}, diff --git a/pkg/sql/colexec/types_integration_test.go b/pkg/sql/colexec/types_integration_test.go index 5cac6dfa2d42..28d14957f71b 100644 --- a/pkg/sql/colexec/types_integration_test.go +++ b/pkg/sql/colexec/types_integration_test.go @@ -87,6 +87,7 @@ func TestSQLTypesIntegration(t *testing.T) { arrowOp := newArrowTestOperator(columnarizer, c, r, typs) materializer := NewMaterializer( + nil, /* allocator */ flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: arrowOp}, diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 67d7ea657a52..c63656059152 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -20,6 +20,7 @@ import ( "github.com/apache/arrow/go/arrow/array" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/colserde" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -394,6 +395,7 @@ func (i *Inbox) Next() coldata.Batch { // to keep errors unchanged (e.g. roachpb.ErrPriority() will // be called on each error in the DistSQLReceiver). i.bufferedMeta = append(i.bufferedMeta, meta) + colexecutils.AccountForMetadata(i.allocator, i.bufferedMeta[len(i.bufferedMeta)-1:]) } } if receivedErr != nil { @@ -483,7 +485,15 @@ func (i *Inbox) sendDrainSignal(ctx context.Context) error { // not be called concurrently with Next. func (i *Inbox) DrainMeta() []execinfrapb.ProducerMetadata { allMeta := i.bufferedMeta - i.bufferedMeta = i.bufferedMeta[:0] + // Eagerly lose the reference to the metadata since it might be of + // non-trivial footprint. + i.bufferedMeta = nil + // We also no longer need the scratch batch. + i.scratch.b = nil + // The allocator tracks the memory usage for a few things (the scratch batch + // as well as the metadata), and when this function returns, we no longer + // reference any of those, so we can release all of the allocations. + defer i.allocator.ReleaseAll() if i.done { // Next exhausted the stream of metadata. diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index 9410cfa325b4..c8cc2d5a68b1 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -435,8 +435,6 @@ type HashRouter struct { unblockedEventsChan <-chan struct{} numBlockedOutputs int - bufferedMeta []execinfrapb.ProducerMetadata - // atomics is shared state between the Run goroutine and any routerOutput // goroutines that call drainMeta. atomics struct { @@ -627,6 +625,8 @@ func (r *HashRouter) Run(ctx context.Context) { }); err != nil { r.cancelOutputs(ctx, err) } + + var bufferedMeta []execinfrapb.ProducerMetadata if inputInitialized { // Retrieving stats and draining the metadata is only safe if the input // to the hash router was properly initialized. @@ -635,14 +635,14 @@ func (r *HashRouter) Run(ctx context.Context) { span.RecordStructured(s.GetStats()) } if meta := execinfra.GetTraceDataAsMetadata(span); meta != nil { - r.bufferedMeta = append(r.bufferedMeta, *meta) + bufferedMeta = append(bufferedMeta, *meta) } } - r.bufferedMeta = append(r.bufferedMeta, r.inputMetaInfo.MetadataSources.DrainMeta()...) + bufferedMeta = append(bufferedMeta, r.inputMetaInfo.MetadataSources.DrainMeta()...) } // Non-blocking send of metadata so that one of the outputs can return it // in DrainMeta. - r.waitForMetadata <- r.bufferedMeta + r.waitForMetadata <- bufferedMeta close(r.waitForMetadata) r.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "hash router") @@ -687,7 +687,6 @@ func (r *HashRouter) resetForTests(ctx context.Context) { r.setDrainState(hashRouterDrainStateRunning) r.waitForMetadata = make(chan []execinfrapb.ProducerMetadata, 1) r.atomics.numDrainedOutputs = 0 - r.bufferedMeta = nil r.numBlockedOutputs = 0 for moreToRead := true; moreToRead; { select { diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 191f91f1d5b9..adbf92696383 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -967,7 +967,8 @@ func (s *vectorizedFlowCreator) setupInput( // Note that if we have opt == flowinfra.FuseAggressively, then we // must use the serial unordered sync above in order to remove any // concurrency. - sync := colexec.NewParallelUnorderedSynchronizer(inputStreamOps, s.waitGroup) + allocator := colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory) + sync := colexec.NewParallelUnorderedSynchronizer(allocator, inputStreamOps, s.waitGroup) sync.LocalPlan = flowCtx.Local opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: sync, @@ -1074,6 +1075,7 @@ func (s *vectorizedFlowCreator) setupOutput( } } else { input = colexec.NewMaterializerNoEvalCtxCopy( + colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), flowCtx, pspec.ProcessorID, opWithMetaInfo, @@ -1168,9 +1170,12 @@ func (s *vectorizedFlowCreator) setupFlow( } args := &colexecargs.NewColOperatorArgs{ - Spec: pspec, - Inputs: inputs, - StreamingMemAccount: s.monitorRegistry.NewStreamingMemAccount(flowCtx), + Spec: pspec, + Inputs: inputs, + StreamingMemAccount: s.monitorRegistry.NewStreamingMemAccount(flowCtx), + StreamingMemAccFactory: func() *mon.BoundAccount { + return s.monitorRegistry.NewStreamingMemAccount(flowCtx) + }, ProcessorConstructor: rowexec.NewProcessor, LocalProcessors: localProcessors, DiskQueueCfg: s.diskQueueCfg, diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index d7dcfc150d6c..b37f76194543 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -230,7 +230,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { }, ) } - synchronizer := colexec.NewParallelUnorderedSynchronizer(synchronizerInputs, &wg) + synchronizer := colexec.NewParallelUnorderedSynchronizer(testAllocator, synchronizerInputs, &wg) inputMetadataSource := colexecop.MetadataSource(synchronizer) flowID := execinfrapb.FlowID{UUID: uuid.MakeV4()} @@ -369,6 +369,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { // coordinator. runFlowCoordinator := func() *colflow.FlowCoordinator { materializer := colexec.NewMaterializer( + nil, /* allocator */ flowCtx, 1, /* processorID */ inputInfo, diff --git a/pkg/sql/colflow/vectorized_panic_propagation_test.go b/pkg/sql/colflow/vectorized_panic_propagation_test.go index 7fcfcdd5fd00..b00e6adf828d 100644 --- a/pkg/sql/colflow/vectorized_panic_propagation_test.go +++ b/pkg/sql/colflow/vectorized_panic_propagation_test.go @@ -53,6 +53,7 @@ func TestVectorizedInternalPanic(t *testing.T) { col := colexec.NewBufferingColumnarizer(testAllocator, &flowCtx, 0 /* processorID */, input) vee := newTestVectorizedInternalPanicEmitter(col) mat := colexec.NewMaterializer( + nil, /* allocator */ &flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: vee}, @@ -88,6 +89,7 @@ func TestNonVectorizedPanicPropagation(t *testing.T) { col := colexec.NewBufferingColumnarizer(testAllocator, &flowCtx, 0 /* processorID */, input) nvee := newTestNonVectorizedPanicEmitter(col) mat := colexec.NewMaterializer( + nil, /* allocator */ &flowCtx, 1, /* processorID */ colexecargs.OpWithMetaInfo{Root: nvee}, diff --git a/pkg/sql/distsql/columnar_utils_test.go b/pkg/sql/distsql/columnar_utils_test.go index bfdce8f5e002..b6a1a871bf63 100644 --- a/pkg/sql/distsql/columnar_utils_test.go +++ b/pkg/sql/distsql/columnar_utils_test.go @@ -162,6 +162,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error { } outColOp := colexec.NewMaterializer( + nil, /* allocator */ flowCtx, int32(len(args.inputs))+2, result.OpWithMetaInfo, diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index 20c25b4ac871..a1abed54165e 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -55,6 +55,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { flow := colflow.NewVectorizedFlow(base) mat := colexec.NewMaterializer( + nil, /* allocator */ &flowCtx, 0, /* processorID */ colexecargs.OpWithMetaInfo{Root: &colexecop.CallbackOperator{ diff --git a/pkg/sql/sem/eval/eval_test/eval_test.go b/pkg/sql/sem/eval/eval_test/eval_test.go index 06e8dccceb6c..1f1be25f4fd6 100644 --- a/pkg/sql/sem/eval/eval_test/eval_test.go +++ b/pkg/sql/sem/eval/eval_test/eval_test.go @@ -200,6 +200,7 @@ func TestEval(t *testing.T) { require.NoError(t, err) mat := colexec.NewMaterializer( + nil, /* allocator */ flowCtx, 0, /* processorID */ result.OpWithMetaInfo,