Skip to content

Commit

Permalink
Merge pull request #80715 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1.0-80679

release-22.1.0: colexec: fix sort chunks with disk spilling in very rare circumstances
  • Loading branch information
yuzefovich authored May 2, 2022
2 parents 24ba4d7 + de35ca1 commit 120fbbb
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 45 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,19 @@ func (r opResult) createDiskBackedSort(
} else if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
opName := opNamePrefix + "sort-chunks"
deselectorUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, processorID,
), factory,
)
var sortChunksMemAccount *mon.BoundAccount
sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID,
ctx, flowCtx, spoolMemLimit, opName, processorID,
)
inMemorySorter = colexec.NewSortChunks(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory),
input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down
38 changes: 17 additions & 21 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
type deselectorOp struct {
colexecop.OneInputHelper
colexecop.NonExplainable
allocator *colmem.Allocator
inputTypes []*types.T
unlimitedAllocator *colmem.Allocator
inputTypes []*types.T

output coldata.Batch
}
Expand All @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{}

// NewDeselectorOp creates a new deselector operator on the given input
// operator with the given column types.
//
// The provided allocator must be derived from an unlimited memory monitor since
// the deselectorOp cannot spill to disk and a "memory budget exceeded" error
// might be caught by the higher-level diskSpiller which would result in losing
// some query results.
func NewDeselectorOp(
allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
) colexecop.Operator {
return &deselectorOp{
OneInputHelper: colexecop.MakeOneInputHelper(input),
allocator: allocator,
inputTypes: typs,
OneInputHelper: colexecop.MakeOneInputHelper(input),
unlimitedAllocator: unlimitedAllocator,
inputTypes: typs,
}
}

func (p *deselectorOp) Next() coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
// TODO(yuzefovich): this allocation is only needed in order to appease the
// tests of the external sorter with forced disk spilling (if we don't do
// this, an OOM error occurs during ResetMaybeReallocate call below at
// which point we have already received a batch from the input and it'll
// get lost because deselectorOp doesn't support fall-over to the
// disk-backed infrastructure).
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize,
)
batch := p.Input.Next()
if batch.Selection() == nil || batch.Length() == 0 {
return batch
}
p.output, _ = p.allocator.ResetMaybeReallocate(
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
)
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
for i := range p.inputTypes {
toCol := p.output.ColVec(i)
fromCol := batch.ColVec(i)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) {
}
tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(
testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
)
})
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) {
return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/partially_ordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// distinct columns when we have partial ordering on some of the distinct
// columns.
func newPartiallyOrderedDistinct(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
distinctCols []uint32,
Expand All @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct(
"partially ordered distinct wrongfully planned: numDistinctCols=%d "+
"numOrderedCols=%d", len(distinctCols), len(orderedCols))
}
chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct)
chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct)
chunkerOperator := newChunkerOperator(allocator, chunker, typs)
// distinctUnorderedCols will contain distinct columns that are not present
// among orderedCols. The unordered distinct operator will use these columns
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/sort_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
// the columns in the input operator. The input tuples must be sorted on first
// matchLen columns.
func NewSortChunks(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -46,7 +47,7 @@ func NewSortChunks(
for i := range alreadySortedCols {
alreadySortedCols[i] = orderingCols[i].ColIdx
}
chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize)
return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter}
}
Expand Down Expand Up @@ -256,6 +257,7 @@ type chunker struct {
var _ spooler = &chunker{}

func newChunker(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -266,7 +268,7 @@ func newChunker(
for i, col := range alreadySortedCols {
partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct)
}
deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes)
deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes)
return &chunker{
OneInputNode: colexecop.NewOneInputNode(deselector),
allocator: allocator,
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/colexec/sort_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) {

for _, tc := range sortChunksTestCases {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) {
sort.Slice(expected, less(expected, ordCols))

colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand All @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) {
ctx := context.Background()

sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{
NewSortChunks,
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize)
},
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize)
},
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type Outbox struct {

typs []*types.T

allocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer
unlimitedAllocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer

// draining is an atomic that represents whether the Outbox is draining.
draining uint32
Expand All @@ -83,7 +83,7 @@ type Outbox struct {
// - getStats, when non-nil, returns all of the execution statistics of the
// operators that are in the same tree as this Outbox.
func NewOutbox(
allocator *colmem.Allocator,
unlimitedAllocator *colmem.Allocator,
input colexecargs.OpWithMetaInfo,
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
Expand All @@ -99,13 +99,13 @@ func NewOutbox(
o := &Outbox{
// Add a deselector as selection vectors are not serialized (nor should they
// be).
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
allocator: allocator,
converter: c,
serializer: s,
getStats: getStats,
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
unlimitedAllocator: unlimitedAllocator,
converter: c,
serializer: s,
getStats: getStats,
}
o.scratch.buf = &bytes.Buffer{}
o.scratch.msg = &execinfrapb.ProducerMessage{}
Expand All @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) {
// registered with the allocator (the allocator is shared by the outbox and
// the deselector).
o.Input = nil
o.allocator.ReleaseMemory(o.allocator.Used())
o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used())
o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox")
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches(
// Note that because we never truncate the buffer, we are only
// adjusting the memory usage whenever the buffer's capacity
// increases (if it didn't increase, this call becomes a noop).
o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes()

// o.scratch.msg can be reused as soon as Send returns since it returns as
Expand Down

0 comments on commit 120fbbb

Please sign in to comment.