Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: fix sort chunks with disk spilling in very rare circumstances #80679

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,19 @@ func (r opResult) createDiskBackedSort(
} else if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
opName := opNamePrefix + "sort-chunks"
deselectorUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, processorID,
), factory,
)
var sortChunksMemAccount *mon.BoundAccount
sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID,
ctx, flowCtx, spoolMemLimit, opName, processorID,
)
inMemorySorter = colexec.NewSortChunks(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
deselectorUnlimitedAllocator, colmem.NewAllocator(ctx, sortChunksMemAccount, factory),
input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down
38 changes: 17 additions & 21 deletions pkg/sql/colexec/colexecutils/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
type deselectorOp struct {
colexecop.OneInputHelper
colexecop.NonExplainable
allocator *colmem.Allocator
inputTypes []*types.T
unlimitedAllocator *colmem.Allocator
inputTypes []*types.T

output coldata.Batch
}
Expand All @@ -36,39 +36,35 @@ var _ colexecop.Operator = &deselectorOp{}

// NewDeselectorOp creates a new deselector operator on the given input
// operator with the given column types.
//
// The provided allocator must be derived from an unlimited memory monitor since
// the deselectorOp cannot spill to disk and a "memory budget exceeded" error
// might be caught by the higher-level diskSpiller which would result in losing
// some query results.
func NewDeselectorOp(
allocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
unlimitedAllocator *colmem.Allocator, input colexecop.Operator, typs []*types.T,
) colexecop.Operator {
return &deselectorOp{
OneInputHelper: colexecop.MakeOneInputHelper(input),
allocator: allocator,
inputTypes: typs,
OneInputHelper: colexecop.MakeOneInputHelper(input),
unlimitedAllocator: unlimitedAllocator,
inputTypes: typs,
}
}

func (p *deselectorOp) Next() coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
// TODO(yuzefovich): this allocation is only needed in order to appease the
// tests of the external sorter with forced disk spilling (if we don't do
// this, an OOM error occurs during ResetMaybeReallocate call below at
// which point we have already received a batch from the input and it'll
// get lost because deselectorOp doesn't support fall-over to the
// disk-backed infrastructure).
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize,
)
batch := p.Input.Next()
if batch.Selection() == nil || batch.Length() == 0 {
return batch
}
p.output, _ = p.allocator.ResetMaybeReallocate(
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
p.output, _ = p.unlimitedAllocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
)
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
p.unlimitedAllocator.PerformOperation(p.output.ColVecs(), func() {
for i := range p.inputTypes {
toCol := p.output.ColVec(i)
fromCol := batch.ColVec(i)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func TestDistinct(t *testing.T) {
}
tc.runTests(t, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(
testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
testAllocator, testAllocator, input[0], tc.distinctCols, orderedCols, tc.typs, tc.nullsAreDistinct, tc.errorOnDup,
)
})
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func BenchmarkDistinct(b *testing.B) {
return NewUnorderedDistinct(allocator, input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return newPartiallyOrderedDistinct(allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
return newPartiallyOrderedDistinct(allocator, allocator, input, distinctCols, distinctCols[:numOrderedCols], typs, false /* nullsAreDistinct */, "" /* errorOnDup */)
},
func(allocator *colmem.Allocator, input colexecop.Operator, distinctCols []uint32, numOrderedCols int, typs []*types.T) (colexecop.Operator, error) {
return colexecbase.NewOrderedDistinct(input, distinctCols, typs, false /* nullsAreDistinct */, "" /* errorOnDup */), nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/partially_ordered_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// distinct columns when we have partial ordering on some of the distinct
// columns.
func newPartiallyOrderedDistinct(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
distinctCols []uint32,
Expand All @@ -39,7 +40,7 @@ func newPartiallyOrderedDistinct(
"partially ordered distinct wrongfully planned: numDistinctCols=%d "+
"numOrderedCols=%d", len(distinctCols), len(orderedCols))
}
chunker := newChunker(allocator, input, typs, orderedCols, nullsAreDistinct)
chunker := newChunker(unlimitedAllocator, allocator, input, typs, orderedCols, nullsAreDistinct)
chunkerOperator := newChunkerOperator(allocator, chunker, typs)
// distinctUnorderedCols will contain distinct columns that are not present
// among orderedCols. The unordered distinct operator will use these columns
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/sort_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
// the columns in the input operator. The input tuples must be sorted on first
// matchLen columns.
func NewSortChunks(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -46,7 +47,7 @@ func NewSortChunks(
for i := range alreadySortedCols {
alreadySortedCols[i] = orderingCols[i].ColIdx
}
chunker := newChunker(allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
chunker := newChunker(unlimitedAllocator, allocator, input, inputTypes, alreadySortedCols, false /* nullsAreDistinct */)
sorter := newSorter(allocator, chunker, inputTypes, orderingCols[matchLen:], maxOutputBatchMemSize)
return &sortChunksOp{allocator: allocator, input: chunker, sorter: sorter}
}
Expand Down Expand Up @@ -256,6 +257,7 @@ type chunker struct {
var _ spooler = &chunker{}

func newChunker(
unlimitedAllocator *colmem.Allocator,
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
Expand All @@ -266,7 +268,7 @@ func newChunker(
for i, col := range alreadySortedCols {
partitioners[i] = newPartitioner(inputTypes[col], nullsAreDistinct)
}
deselector := colexecutils.NewDeselectorOp(allocator, input, inputTypes)
deselector := colexecutils.NewDeselectorOp(unlimitedAllocator, input, inputTypes)
return &chunker{
OneInputNode: colexecop.NewOneInputNode(deselector),
allocator: allocator,
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/colexec/sort_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSortChunks(t *testing.T) {

for _, tc := range sortChunksTestCases {
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], tc.typs, tc.ordCols, tc.matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestSortChunksRandomized(t *testing.T) {
sort.Slice(expected, less(expected, ordCols))

colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{sortedTups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
return NewSortChunks(testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
return NewSortChunks(testAllocator, testAllocator, input[0], typs[:nCols], ordCols, matchLen, execinfra.DefaultMemoryLimit), nil
})
}
}
Expand All @@ -244,7 +244,9 @@ func BenchmarkSortChunks(b *testing.B) {
ctx := context.Background()

sorterConstructors := []func(*colmem.Allocator, colexecop.Operator, []*types.T, []execinfrapb.Ordering_Column, int, int64) colexecop.Operator{
NewSortChunks,
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, matchLen int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSortChunks(allocator, allocator, input, inputTypes, orderingCols, matchLen, maxOutputBatchMemSize)
},
func(allocator *colmem.Allocator, input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column, _ int, maxOutputBatchMemSize int64) colexecop.Operator {
return NewSorter(allocator, input, inputTypes, orderingCols, maxOutputBatchMemSize)
},
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type Outbox struct {

typs []*types.T

allocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer
unlimitedAllocator *colmem.Allocator
converter *colserde.ArrowBatchConverter
serializer *colserde.RecordBatchSerializer

// draining is an atomic that represents whether the Outbox is draining.
draining uint32
Expand All @@ -83,7 +83,7 @@ type Outbox struct {
// - getStats, when non-nil, returns all of the execution statistics of the
// operators that are in the same tree as this Outbox.
func NewOutbox(
allocator *colmem.Allocator,
unlimitedAllocator *colmem.Allocator,
input colexecargs.OpWithMetaInfo,
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
Expand All @@ -99,13 +99,13 @@ func NewOutbox(
o := &Outbox{
// Add a deselector as selection vectors are not serialized (nor should they
// be).
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(allocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
allocator: allocator,
converter: c,
serializer: s,
getStats: getStats,
OneInputNode: colexecop.NewOneInputNode(colexecutils.NewDeselectorOp(unlimitedAllocator, input.Root, typs)),
inputMetaInfo: input,
typs: typs,
unlimitedAllocator: unlimitedAllocator,
converter: c,
serializer: s,
getStats: getStats,
}
o.scratch.buf = &bytes.Buffer{}
o.scratch.msg = &execinfrapb.ProducerMessage{}
Expand All @@ -120,7 +120,7 @@ func (o *Outbox) close(ctx context.Context) {
// registered with the allocator (the allocator is shared by the outbox and
// the deselector).
o.Input = nil
o.allocator.ReleaseMemory(o.allocator.Used())
o.unlimitedAllocator.ReleaseMemory(o.unlimitedAllocator.Used())
o.inputMetaInfo.ToClose.CloseAndLogOnErr(ctx, "outbox")
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (o *Outbox) sendBatches(
// Note that because we never truncate the buffer, we are only
// adjusting the memory usage whenever the buffer's capacity
// increases (if it didn't increase, this call becomes a noop).
o.allocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.unlimitedAllocator.AdjustMemoryUsage(int64(o.scratch.buf.Cap() - oldBufCap))
o.scratch.msg.Data.RawBytes = o.scratch.buf.Bytes()

// o.scratch.msg can be reused as soon as Send returns since it returns as
Expand Down