Skip to content

Commit

Permalink
Merge #67493
Browse files Browse the repository at this point in the history
67493: colexecutils: simplify usage of AppendOnlyBufferedBatch a bit r=yuzefovich a=yuzefovich

This commit pushes the burden of memory accounting for the newly
allocated memory in AppendTuples method into the method itself. This
allows us to clean up the code slightly.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 12, 2021
2 parents 31ac48c + 0e78460 commit 835da51
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 45 deletions.
8 changes: 2 additions & 6 deletions pkg/sql/colexec/colexechash/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ func (ht *HashTable) FullBuild(input colexecop.Operator) {
if batch.Length() == 0 {
break
}
ht.allocator.PerformAppend(ht.Vals, func() {
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
})
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
}
ht.buildFromBufferedTuples()
}
Expand Down Expand Up @@ -483,9 +481,7 @@ func (ht *HashTable) RemoveDuplicates(
// NOTE: batch must be of non-zero length.
func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) {
numBuffered := uint64(ht.Vals.Length())
ht.allocator.PerformAppend(ht.Vals, func() {
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
})
ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length())
ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:batch.Length()]...)
ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, numBuffered+1, uint64(batch.Length()))
if ht.shouldResize(ht.Vals.Length()) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/colexecutils/spilling_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ func (b *SpillingBuffer) AppendTuples(
maxInMemTuplesLimitReached := b.testingKnobs.maxTuplesStoredInMemory > 0 &&
b.bufferedTuples.Length() >= b.testingKnobs.maxTuplesStoredInMemory
if !memLimitReached && b.diskQueue == nil && !maxInMemTuplesLimitReached {
b.unlimitedAllocator.PerformAppend(b.bufferedTuples, func() {
b.bufferedTuples.AppendTuples(batch, startIdx, endIdx)
})
b.bufferedTuples.AppendTuples(batch, startIdx, endIdx)
return
}
// Not all tuples could be stored in-memory; they will have to be placed in
Expand Down
32 changes: 18 additions & 14 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewAppendOnlyBufferedBatch(
batch := allocator.NewMemBatchWithFixedCapacity(typs, 0 /* capacity */)
return &AppendOnlyBufferedBatch{
batch: batch,
allocator: allocator,
colVecs: batch.ColVecs(),
colsToStore: colsToStore,
}
Expand All @@ -97,6 +98,7 @@ type AppendOnlyBufferedBatch struct {
// through the implementation of each method of coldata.Batch interface.
batch coldata.Batch

allocator *colmem.Allocator
length int
colVecs []coldata.Vec
colsToStore []int
Expand Down Expand Up @@ -194,22 +196,24 @@ func (b *AppendOnlyBufferedBatch) String() string {

// AppendTuples is a helper method that appends all tuples with indices in range
// [startIdx, endIdx) from batch (paying attention to the selection vector)
// into b.
// NOTE: this does *not* perform memory accounting.
// into b. The newly allocated memory is registered with the allocator used to
// create this AppendOnlyBufferedBatch.
// NOTE: batch must be of non-zero length.
func (b *AppendOnlyBufferedBatch) AppendTuples(batch coldata.Batch, startIdx, endIdx int) {
for _, colIdx := range b.colsToStore {
b.colVecs[colIdx].Append(
coldata.SliceArgs{
Src: batch.ColVec(colIdx),
Sel: batch.Selection(),
DestIdx: b.length,
SrcStartIdx: startIdx,
SrcEndIdx: endIdx,
},
)
}
b.length += endIdx - startIdx
b.allocator.PerformAppend(b, func() {
for _, colIdx := range b.colsToStore {
b.colVecs[colIdx].Append(
coldata.SliceArgs{
Src: batch.ColVec(colIdx),
Sel: batch.Selection(),
DestIdx: b.length,
SrcStartIdx: startIdx,
SrcEndIdx: endIdx,
},
)
}
b.length += endIdx - startIdx
})
}

// MaybeAllocateUint64Array makes sure that the passed in array is allocated, of
Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,9 @@ func (op *hashAggregator) Next() coldata.Batch {
switch op.state {
case hashAggregatorBuffering:
if op.bufferingState.pendingBatch != nil && op.bufferingState.unprocessedIdx < op.bufferingState.pendingBatch.Length() {
op.allocator.PerformAppend(op.bufferingState.tuples, func() {
op.bufferingState.tuples.AppendTuples(
op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx, op.bufferingState.pendingBatch.Length(),
)
})
op.bufferingState.tuples.AppendTuples(
op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx, op.bufferingState.pendingBatch.Length(),
)
}
op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx = op.Input.Next(), 0
n := op.bufferingState.pendingBatch.Length()
Expand Down Expand Up @@ -268,9 +266,7 @@ func (op *hashAggregator) Next() coldata.Batch {
toBuffer = op.maxBuffered - op.bufferingState.tuples.Length()
}
if toBuffer > 0 {
op.allocator.PerformAppend(op.bufferingState.tuples, func() {
op.bufferingState.tuples.AppendTuples(op.bufferingState.pendingBatch, 0 /* startIdx */, toBuffer)
})
op.bufferingState.tuples.AppendTuples(op.bufferingState.pendingBatch, 0 /* startIdx */, toBuffer)
op.bufferingState.unprocessedIdx = toBuffer
}
if op.bufferingState.tuples.Length() == op.maxBuffered {
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ func (p *allSpooler) spool() {
}
p.spooled = true
for batch := p.Input.Next(); batch.Length() != 0; batch = p.Input.Next() {
p.allocator.PerformAppend(p.bufferedTuples, func() {
p.bufferedTuples.AppendTuples(batch, 0 /* startIdx */, batch.Length())
})
p.bufferedTuples.AppendTuples(batch, 0 /* startIdx */, batch.Length())
}
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/colexec/sort_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,8 @@ func (s *chunker) buffer(start int, end int) {
if start == end {
return
}
s.allocator.PerformAppend(s.bufferedTuples, func() {
s.exportState.numProcessedTuplesFromBatch = end
s.bufferedTuples.AppendTuples(s.batch, start, end)
})
s.exportState.numProcessedTuplesFromBatch = end
s.bufferedTuples.AppendTuples(s.batch, start, end)
}

func (s *chunker) spool() {
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ func (t *topKSorter) spool() {
fromLength = int(remainingRows)
}
t.firstUnprocessedTupleIdx = fromLength
t.allocator.PerformAppend(t.topK, func() {
t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength)
})
t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength)
remainingRows -= uint64(fromLength)
if fromLength == t.inputBatch.Length() {
t.inputBatch = t.Input.Next()
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/colmem/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,19 @@ func TestPerformAppend(t *testing.T) {

beforePerformOperation := testAllocator.Used()
testAllocator.PerformOperation(batch1.ColVecs(), func() {
batch1.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length())
for colIdx, destVec := range batch1.ColVecs() {
destVec.Append(coldata.SliceArgs{
Src: inputBatch.ColVec(colIdx),
DestIdx: batch1.Length(),
SrcEndIdx: inputBatch.Length(),
})
}
batch1.SetLength(batch1.Length() + inputBatch.Length())
})
afterPerformOperation := testAllocator.Used()

beforePerformAppend := afterPerformOperation
testAllocator.PerformAppend(batch2, func() {
batch2.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length())
})
batch2.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length())
afterPerformAppend := testAllocator.Used()

performOperationMem := afterPerformOperation - beforePerformOperation
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,6 +1877,7 @@ func TestLint(t *testing.T) {
"--",
"sql/col*",
":!sql/colexec/colexecutils/utils.go",
":!sql/colmem/allocator_test.go",
)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 835da51

Please sign in to comment.