diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 5208070afdb2..959b9e7be448 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -375,13 +375,19 @@ func (r opResult) createDiskBackedSort( } else if matchLen > 0 { // The input is already partially ordered. Use a chunks sorter to avoid // loading all the rows into memory. + opName := opNamePrefix + "sort-chunks" + deselectorUnlimitedAllocator := colmem.NewAllocator( + ctx, args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, + ), factory, + ) var sortChunksMemAccount *mon.BoundAccount sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( - ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID, + ctx, flowCtx, spoolMemLimit, opName, processorID, ) inMemorySorter = colexec.NewSortChunks( - colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes, - ordering.Columns, int(matchLen), maxOutputBatchMemSize, + deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory), + input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize, ) } else { // No optimizations possible. Default to the standard sort operator. diff --git a/pkg/sql/colexec/colexecutils/deselector.go b/pkg/sql/colexec/colexecutils/deselector.go index 462f45417028..9d3e87e3ef41 100644 --- a/pkg/sql/colexec/colexecutils/deselector.go +++ b/pkg/sql/colexec/colexecutils/deselector.go @@ -26,8 +26,8 @@ import ( type deselectorOp struct { colexecop.OneInputHelper colexecop.NonExplainable - allocator *colmem.Allocator - inputTypes []*types.T + unlimitedAllocator *colmem.Allocator + inputTypes []*types.T output coldata.Batch } @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{} // NewDeselectorOp creates a new deselector operator on the given input // operator with the given column types. +// +// The provided allocator must be derived from an unlimited memory monitor since +// the deselectorOp cannot spill to disk and a "memory budget exceeded" error +// might be caught by the higher-level diskSpiller which would result in losing +// some query results. func NewDeselectorOp( - allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, + unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, ) colexecop.Operator { return &deselectorOp{ - OneInputHelper: colexecop.MakeOneInputHelper(input), - allocator: allocator, - inputTypes: typs, + OneInputHelper: colexecop.MakeOneInputHelper(input), + unlimitedAllocator: unlimitedAllocator, + inputTypes: typs, } } func (p *deselectorOp) Next() coldata.Batch { - // deselectorOp should *not* limit the capacities of the returned batches, - // so we don't use a memory limit here. It is up to the wrapped operator to - // limit the size of batches based on the memory footprint. - const maxBatchMemSize = math.MaxInt64 - // TODO(yuzefovich): this allocation is only needed in order to appease the - // tests of the external sorter with forced disk spilling (if we don't do - // this, an OOM error occurs during ResetMaybeReallocate call below at - // which point we have already received a batch from the input and it'll - // get lost because deselectorOp doesn't support fall-over to the - // disk-backed infrastructure). - p.output, _ = p.allocator.ResetMaybeReallocate( - p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize, - ) batch := p.Input.Next() if batch.Selection() == nil || batch.Length() == 0 { return batch } - p.output, _ = p.allocator.ResetMaybeReallocate( + // deselectorOp should *not* limit the capacities of the returned batches, + // so we don't use a memory limit here. It is up to the wrapped operator to + // limit the size of batches based on the memory footprint. + const maxBatchMemSize = math.MaxInt64 + p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate( p.inputTypes, p.output, batch.Length(), maxBatchMemSize, ) sel := batch.Selection() - p.allocator.PerformOperation(p.output.ColVecs(), func() { + p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() { for i := range p.inputTypes { toCol := p.output.ColVec(i) fromCol := batch.ColVec(i) diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index c8b11219ab6c..e0fc76ef8a23 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) { } tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { return newPartiallyOrderedDistinct( - testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, + testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, ) }) } @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) { return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { - return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) + return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil diff --git a/pkg/sql/colexec/partially_ordered_distinct.go b/pkg/sql/colexec/partially_ordered_distinct.go index dc1bf3c83b65..685fe15f4a75 100644 --- a/pkg/sql/colexec/partially_ordered_distinct.go +++ b/pkg/sql/colexec/partially_ordered_distinct.go @@ -26,6 +26,7 @@ import ( // distinct columns when we have partial ordering on some of the distinct // columns. func newPartiallyOrderedDistinct( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct( "partially ordered distinct wrongfully planned: numDistinctCols=%d "+ "numOrderedCols=%d", len(distinctCols), len(orderedCols)) } - chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct) + chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct) chunkerOperator := newChunkerOperator(allocator, chunker, typs) // distinctUnorderedCols will contain distinct columns that are not present // among orderedCols. The unordered distinct operator will use these columns diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index 41e854f75a13..e379250db2eb 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -29,6 +29,7 @@ import ( // the columns in the input operator. The input tuples must be sorted on first // matchLen columns. func NewSortChunks( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -46,7 +47,7 @@ func NewSortChunks( for i := range alreadySortedCols { alreadySortedCols[i] = orderingCols[i].ColIdx } - chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) + chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize) return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter} } @@ -256,6 +257,7 @@ type chunker struct { var _ spooler = &chunker{} func newChunker( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -266,7 +268,7 @@ func newChunker( for i, col := range alreadySortedCols { partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct) } - deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes) + deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes) return &chunker{ OneInputNode: colexecop.NewOneInputNode(deselector), allocator: allocator, diff --git a/pkg/sql/colexec/sort_chunks_test.go b/pkg/sql/colexec/sort_chunks_test.go index 2702e32c4b3d..2428dce424ce 100644 --- a/pkg/sql/colexec/sort_chunks_test.go +++ b/pkg/sql/colexec/sort_chunks_test.go @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) { for _, tc := range sortChunksTestCases { colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) { sort.Slice(expected, less(expected, ordCols)) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) { ctx := context.Background() sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{ - NewSortChunks, + func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator { + return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize) + }, func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator { return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize) }, diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 7a2a45d3b7e8..98e4f4898593 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -55,9 +55,9 @@ type Outbox struct { typs []*types.T - allocator *colmem.Allocator - converter *colserde.ArrowBatchConverter - serializer *colserde.RecordBatchSerializer + unlimitedAllocator *colmem.Allocator + converter *colserde.ArrowBatchConverter + serializer *colserde.RecordBatchSerializer // draining is an atomic that represents whether the Outbox is draining. draining uint32 @@ -83,7 +83,7 @@ type Outbox struct { // - getStats, when non-nil, returns all of the execution statistics of the // operators that are in the same tree as this Outbox. func NewOutbox( - allocator *colmem.Allocator, + unlimitedAllocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, getStats func() []*execinfrapb.ComponentStats, @@ -99,13 +99,13 @@ func NewOutbox( o := &Outbox{ // Add a deselector as selection vectors are not serialized (nor should they // be). - OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)), - inputMetaInfo: input, - typs: typs, - allocator: allocator, - converter: c, - serializer: s, - getStats: getStats, + OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)), + inputMetaInfo: input, + typs: typs, + unlimitedAllocator: unlimitedAllocator, + converter: c, + serializer: s, + getStats: getStats, } o.scratch.buf = &bytes.Buffer{} o.scratch.msg = &execinfrapb.ProducerMessage{} @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) { // registered with the allocator (the allocator is shared by the outbox and // the deselector). o.Input = nil - o.allocator.ReleaseMemory(o.allocator.Used()) + o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used()) o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox") } @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches( // Note that because we never truncate the buffer, we are only // adjusting the memory usage whenever the buffer's capacity // increases (if it didn't increase, this call becomes a noop). - o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) + o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes() // o.scratch.msg can be reused as soon as Send returns since it returns as