From de35ca1b2dc6ded31827cc692a2dae133e7399fd Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 28 Apr 2022 00:18:02 +0000 Subject: [PATCH] colexec: fix sort chunks with disk spilling in very rare circumstances This commit fixes a long-standing but very rare bug which could result in some rows being dropped when sort chunks ("segmented sort") spills to disk. The root cause is that a deselector operator is placed on top of the input to the sort chunks op (because its "chunker" spooler assumes no selection vector on batches), and that deselector uses the same allocator as the sort chunks. If the allocator's budget is used up, then an error is thrown, and it is caught by the disk-spilling infrastructure that is wrapping this whole `sort chunks -> chunker -> deselector` chain; the error is then suppressed, and spilling to disk occurs. However, crucially, it was always assumed that the error occurred in `chunker`, so only that component knows how to properly perform the fallover. If the error occurs in the deselector, the deselector might end up losing a single input batch. We worked around this by making a fake allocation in the deselector before reading the input batch. However, if the stars align, and the error occurs _after_ reading the input batch in the deselector, that input batch will be lost, and we might get incorrect results. For the bug to occur a couple of conditions need to be met: 1. The "memory budget exceeded" error must occur for the sort chunks operation. It is far more likely that it will occur in the "chunker" because that component can buffer an arbitrarily large number of tuples and because we did make that fake allocation. 2. The input operator to the chain must be producing batches with selection vectors on top - if this is not the case, then the deselector is a noop. An example of such an input is a table reader with a filter on top. The fix is quite simple - use a separate allocator for the deselector that has an unlimited budget. This allows us to still properly track the memory usage of an extra batch created in the deselector without it running into these difficulties with disk spilling. This also makes it so that if a "memory budget exceeded" error does occur in the deselector (which is possible if `--max-sql-memory` has been used up), it will not be caught by the disk-spilling infrastructure and will be propagate to the user - which is the expected and desired behavior in such a scenario. There is no explicit regression test for this since our existing unit tests already exercise this scenario once the fake allocation in the deselector is removed. Release note (bug fix): Previously, in very rare circumstances CockroachDB could incorrectly evaluate queries with ORDER BY clause when the prefix of ordering was already provided by the index ordering of the scanned table. --- pkg/sql/colexec/colbuilder/execplan.go | 12 ++++-- pkg/sql/colexec/colexecutils/deselector.go | 38 +++++++++---------- pkg/sql/colexec/distinct_test.go | 4 +- pkg/sql/colexec/partially_ordered_distinct.go | 3 +- pkg/sql/colexec/sort_chunks.go | 6 ++- pkg/sql/colexec/sort_chunks_test.go | 8 ++-- pkg/sql/colflow/colrpc/outbox.go | 26 ++++++------- 7 files changed, 52 insertions(+), 45 deletions(-) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 5208070afdb2..959b9e7be448 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -375,13 +375,19 @@ func (r opResult) createDiskBackedSort( } else if matchLen > 0 { // The input is already partially ordered. Use a chunks sorter to avoid // loading all the rows into memory. + opName := opNamePrefix + "sort-chunks" + deselectorUnlimitedAllocator := colmem.NewAllocator( + ctx, args.MonitorRegistry.CreateUnlimitedMemAccount( + ctx, flowCtx, opName, processorID, + ), factory, + ) var sortChunksMemAccount *mon.BoundAccount sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit( - ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID, + ctx, flowCtx, spoolMemLimit, opName, processorID, ) inMemorySorter = colexec.NewSortChunks( - colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes, - ordering.Columns, int(matchLen), maxOutputBatchMemSize, + deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory), + input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize, ) } else { // No optimizations possible. Default to the standard sort operator. diff --git a/pkg/sql/colexec/colexecutils/deselector.go b/pkg/sql/colexec/colexecutils/deselector.go index 462f45417028..9d3e87e3ef41 100644 --- a/pkg/sql/colexec/colexecutils/deselector.go +++ b/pkg/sql/colexec/colexecutils/deselector.go @@ -26,8 +26,8 @@ import ( type deselectorOp struct { colexecop.OneInputHelper colexecop.NonExplainable - allocator *colmem.Allocator - inputTypes []*types.T + unlimitedAllocator *colmem.Allocator + inputTypes []*types.T output coldata.Batch } @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{} // NewDeselectorOp creates a new deselector operator on the given input // operator with the given column types. +// +// The provided allocator must be derived from an unlimited memory monitor since +// the deselectorOp cannot spill to disk and a "memory budget exceeded" error +// might be caught by the higher-level diskSpiller which would result in losing +// some query results. func NewDeselectorOp( - allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, + unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T, ) colexecop.Operator { return &deselectorOp{ - OneInputHelper: colexecop.MakeOneInputHelper(input), - allocator: allocator, - inputTypes: typs, + OneInputHelper: colexecop.MakeOneInputHelper(input), + unlimitedAllocator: unlimitedAllocator, + inputTypes: typs, } } func (p *deselectorOp) Next() coldata.Batch { - // deselectorOp should *not* limit the capacities of the returned batches, - // so we don't use a memory limit here. It is up to the wrapped operator to - // limit the size of batches based on the memory footprint. - const maxBatchMemSize = math.MaxInt64 - // TODO(yuzefovich): this allocation is only needed in order to appease the - // tests of the external sorter with forced disk spilling (if we don't do - // this, an OOM error occurs during ResetMaybeReallocate call below at - // which point we have already received a batch from the input and it'll - // get lost because deselectorOp doesn't support fall-over to the - // disk-backed infrastructure). - p.output, _ = p.allocator.ResetMaybeReallocate( - p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize, - ) batch := p.Input.Next() if batch.Selection() == nil || batch.Length() == 0 { return batch } - p.output, _ = p.allocator.ResetMaybeReallocate( + // deselectorOp should *not* limit the capacities of the returned batches, + // so we don't use a memory limit here. It is up to the wrapped operator to + // limit the size of batches based on the memory footprint. + const maxBatchMemSize = math.MaxInt64 + p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate( p.inputTypes, p.output, batch.Length(), maxBatchMemSize, ) sel := batch.Selection() - p.allocator.PerformOperation(p.output.ColVecs(), func() { + p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() { for i := range p.inputTypes { toCol := p.output.ColVec(i) fromCol := batch.ColVec(i) diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index c8b11219ab6c..e0fc76ef8a23 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) { } tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { return newPartiallyOrderedDistinct( - testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, + testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup, ) }) } @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) { return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { - return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) + return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */) }, func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) { return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil diff --git a/pkg/sql/colexec/partially_ordered_distinct.go b/pkg/sql/colexec/partially_ordered_distinct.go index dc1bf3c83b65..685fe15f4a75 100644 --- a/pkg/sql/colexec/partially_ordered_distinct.go +++ b/pkg/sql/colexec/partially_ordered_distinct.go @@ -26,6 +26,7 @@ import ( // distinct columns when we have partial ordering on some of the distinct // columns. func newPartiallyOrderedDistinct( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct( "partially ordered distinct wrongfully planned: numDistinctCols=%d "+ "numOrderedCols=%d", len(distinctCols), len(orderedCols)) } - chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct) + chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct) chunkerOperator := newChunkerOperator(allocator, chunker, typs) // distinctUnorderedCols will contain distinct columns that are not present // among orderedCols. The unordered distinct operator will use these columns diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index 41e854f75a13..e379250db2eb 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -29,6 +29,7 @@ import ( // the columns in the input operator. The input tuples must be sorted on first // matchLen columns. func NewSortChunks( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -46,7 +47,7 @@ func NewSortChunks( for i := range alreadySortedCols { alreadySortedCols[i] = orderingCols[i].ColIdx } - chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) + chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */) sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize) return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter} } @@ -256,6 +257,7 @@ type chunker struct { var _ spooler = &chunker{} func newChunker( + unlimitedAllocator *colmem.Allocator, allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, @@ -266,7 +268,7 @@ func newChunker( for i, col := range alreadySortedCols { partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct) } - deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes) + deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes) return &chunker{ OneInputNode: colexecop.NewOneInputNode(deselector), allocator: allocator, diff --git a/pkg/sql/colexec/sort_chunks_test.go b/pkg/sql/colexec/sort_chunks_test.go index 2702e32c4b3d..2428dce424ce 100644 --- a/pkg/sql/colexec/sort_chunks_test.go +++ b/pkg/sql/colexec/sort_chunks_test.go @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) { for _, tc := range sortChunksTestCases { colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) { sort.Slice(expected, less(expected, ordCols)) colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) { - return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil + return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil }) } } @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) { ctx := context.Background() sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{ - NewSortChunks, + func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator { + return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize) + }, func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator { return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize) }, diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 7a2a45d3b7e8..98e4f4898593 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -55,9 +55,9 @@ type Outbox struct { typs []*types.T - allocator *colmem.Allocator - converter *colserde.ArrowBatchConverter - serializer *colserde.RecordBatchSerializer + unlimitedAllocator *colmem.Allocator + converter *colserde.ArrowBatchConverter + serializer *colserde.RecordBatchSerializer // draining is an atomic that represents whether the Outbox is draining. draining uint32 @@ -83,7 +83,7 @@ type Outbox struct { // - getStats, when non-nil, returns all of the execution statistics of the // operators that are in the same tree as this Outbox. func NewOutbox( - allocator *colmem.Allocator, + unlimitedAllocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, getStats func() []*execinfrapb.ComponentStats, @@ -99,13 +99,13 @@ func NewOutbox( o := &Outbox{ // Add a deselector as selection vectors are not serialized (nor should they // be). - OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)), - inputMetaInfo: input, - typs: typs, - allocator: allocator, - converter: c, - serializer: s, - getStats: getStats, + OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)), + inputMetaInfo: input, + typs: typs, + unlimitedAllocator: unlimitedAllocator, + converter: c, + serializer: s, + getStats: getStats, } o.scratch.buf = &bytes.Buffer{} o.scratch.msg = &execinfrapb.ProducerMessage{} @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) { // registered with the allocator (the allocator is shared by the outbox and // the deselector). o.Input = nil - o.allocator.ReleaseMemory(o.allocator.Used()) + o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used()) o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox") } @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches( // Note that because we never truncate the buffer, we are only // adjusting the memory usage whenever the buffer's capacity // increases (if it didn't increase, this call becomes a noop). - o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) + o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap)) o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes() // o.scratch.msg can be reused as soon as Send returns since it returns as