From 0e78460e0ef3e3c2c0069e148fa2c7cb7c0f9f0f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 12 Jul 2021 08:39:54 -0700 Subject: [PATCH] colexecutils: simplify usage of AppendOnlyBufferedBatch a bit This commit pushes the burden of memory accounting for the newly allocated memory in AppendTuples method into the method itself. This allows us to clean up the code slightly. Release note: None --- pkg/sql/colexec/colexechash/hashtable.go | 8 ++--- .../colexec/colexecutils/spilling_buffer.go | 4 +-- pkg/sql/colexec/colexecutils/utils.go | 32 +++++++++++-------- pkg/sql/colexec/hash_aggregator.go | 12 +++---- pkg/sql/colexec/sort.go | 4 +-- pkg/sql/colexec/sort_chunks.go | 6 ++-- pkg/sql/colexec/sorttopk.go | 4 +-- pkg/sql/colmem/allocator_test.go | 13 +++++--- pkg/testutils/lint/lint_test.go | 1 + 9 files changed, 39 insertions(+), 45 deletions(-) diff --git a/pkg/sql/colexec/colexechash/hashtable.go b/pkg/sql/colexec/colexechash/hashtable.go index d74c9f0abbaa..fff7e3446b15 100644 --- a/pkg/sql/colexec/colexechash/hashtable.go +++ b/pkg/sql/colexec/colexechash/hashtable.go @@ -365,9 +365,7 @@ func (ht *HashTable) FullBuild(input colexecop.Operator) { if batch.Length() == 0 { break } - ht.allocator.PerformAppend(ht.Vals, func() { - ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length()) - }) + ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length()) } ht.buildFromBufferedTuples() } @@ -483,9 +481,7 @@ func (ht *HashTable) RemoveDuplicates( // NOTE: batch must be of non-zero length. func (ht *HashTable) AppendAllDistinct(batch coldata.Batch) { numBuffered := uint64(ht.Vals.Length()) - ht.allocator.PerformAppend(ht.Vals, func() { - ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length()) - }) + ht.Vals.AppendTuples(batch, 0 /* startIdx */, batch.Length()) ht.BuildScratch.Next = append(ht.BuildScratch.Next, ht.ProbeScratch.HashBuffer[:batch.Length()]...) ht.buildNextChains(ht.BuildScratch.First, ht.BuildScratch.Next, numBuffered+1, uint64(batch.Length())) if ht.shouldResize(ht.Vals.Length()) { diff --git a/pkg/sql/colexec/colexecutils/spilling_buffer.go b/pkg/sql/colexec/colexecutils/spilling_buffer.go index bd5746335182..64323b51893e 100644 --- a/pkg/sql/colexec/colexecutils/spilling_buffer.go +++ b/pkg/sql/colexec/colexecutils/spilling_buffer.go @@ -168,9 +168,7 @@ func (b *SpillingBuffer) AppendTuples( maxInMemTuplesLimitReached := b.testingKnobs.maxTuplesStoredInMemory > 0 && b.bufferedTuples.Length() >= b.testingKnobs.maxTuplesStoredInMemory if !memLimitReached && b.diskQueue == nil && !maxInMemTuplesLimitReached { - b.unlimitedAllocator.PerformAppend(b.bufferedTuples, func() { - b.bufferedTuples.AppendTuples(batch, startIdx, endIdx) - }) + b.bufferedTuples.AppendTuples(batch, startIdx, endIdx) return } // Not all tuples could be stored in-memory; they will have to be placed in diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index de8161a8e3dd..1982d9d0d7c4 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -76,6 +76,7 @@ func NewAppendOnlyBufferedBatch( batch := allocator.NewMemBatchWithFixedCapacity(typs, 0 /* capacity */) return &AppendOnlyBufferedBatch{ batch: batch, + allocator: allocator, colVecs: batch.ColVecs(), colsToStore: colsToStore, } @@ -97,6 +98,7 @@ type AppendOnlyBufferedBatch struct { // through the implementation of each method of coldata.Batch interface. batch coldata.Batch + allocator *colmem.Allocator length int colVecs []coldata.Vec colsToStore []int @@ -194,22 +196,24 @@ func (b *AppendOnlyBufferedBatch) String() string { // AppendTuples is a helper method that appends all tuples with indices in range // [startIdx, endIdx) from batch (paying attention to the selection vector) -// into b. -// NOTE: this does *not* perform memory accounting. +// into b. The newly allocated memory is registered with the allocator used to +// create this AppendOnlyBufferedBatch. // NOTE: batch must be of non-zero length. func (b *AppendOnlyBufferedBatch) AppendTuples(batch coldata.Batch, startIdx, endIdx int) { - for _, colIdx := range b.colsToStore { - b.colVecs[colIdx].Append( - coldata.SliceArgs{ - Src: batch.ColVec(colIdx), - Sel: batch.Selection(), - DestIdx: b.length, - SrcStartIdx: startIdx, - SrcEndIdx: endIdx, - }, - ) - } - b.length += endIdx - startIdx + b.allocator.PerformAppend(b, func() { + for _, colIdx := range b.colsToStore { + b.colVecs[colIdx].Append( + coldata.SliceArgs{ + Src: batch.ColVec(colIdx), + Sel: batch.Selection(), + DestIdx: b.length, + SrcStartIdx: startIdx, + SrcEndIdx: endIdx, + }, + ) + } + b.length += endIdx - startIdx + }) } // MaybeAllocateUint64Array makes sure that the passed in array is allocated, of diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 0d41f8fae458..f5f1acf6b81d 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -229,11 +229,9 @@ func (op *hashAggregator) Next() coldata.Batch { switch op.state { case hashAggregatorBuffering: if op.bufferingState.pendingBatch != nil && op.bufferingState.unprocessedIdx < op.bufferingState.pendingBatch.Length() { - op.allocator.PerformAppend(op.bufferingState.tuples, func() { - op.bufferingState.tuples.AppendTuples( - op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx, op.bufferingState.pendingBatch.Length(), - ) - }) + op.bufferingState.tuples.AppendTuples( + op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx, op.bufferingState.pendingBatch.Length(), + ) } op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx = op.Input.Next(), 0 n := op.bufferingState.pendingBatch.Length() @@ -268,9 +266,7 @@ func (op *hashAggregator) Next() coldata.Batch { toBuffer = op.maxBuffered - op.bufferingState.tuples.Length() } if toBuffer > 0 { - op.allocator.PerformAppend(op.bufferingState.tuples, func() { - op.bufferingState.tuples.AppendTuples(op.bufferingState.pendingBatch, 0 /* startIdx */, toBuffer) - }) + op.bufferingState.tuples.AppendTuples(op.bufferingState.pendingBatch, 0 /* startIdx */, toBuffer) op.bufferingState.unprocessedIdx = toBuffer } if op.bufferingState.tuples.Length() == op.maxBuffered { diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index 0160f393586a..c2db9c884726 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -138,9 +138,7 @@ func (p *allSpooler) spool() { } p.spooled = true for batch := p.Input.Next(); batch.Length() != 0; batch = p.Input.Next() { - p.allocator.PerformAppend(p.bufferedTuples, func() { - p.bufferedTuples.AppendTuples(batch, 0 /* startIdx */, batch.Length()) - }) + p.bufferedTuples.AppendTuples(batch, 0 /* startIdx */, batch.Length()) } } diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index 418c4378c5df..9b330d19806a 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -429,10 +429,8 @@ func (s *chunker) buffer(start int, end int) { if start == end { return } - s.allocator.PerformAppend(s.bufferedTuples, func() { - s.exportState.numProcessedTuplesFromBatch = end - s.bufferedTuples.AppendTuples(s.batch, start, end) - }) + s.exportState.numProcessedTuplesFromBatch = end + s.bufferedTuples.AppendTuples(s.batch, start, end) } func (s *chunker) spool() { diff --git a/pkg/sql/colexec/sorttopk.go b/pkg/sql/colexec/sorttopk.go index 227cd3587287..efefe753ec1f 100644 --- a/pkg/sql/colexec/sorttopk.go +++ b/pkg/sql/colexec/sorttopk.go @@ -168,9 +168,7 @@ func (t *topKSorter) spool() { fromLength = int(remainingRows) } t.firstUnprocessedTupleIdx = fromLength - t.allocator.PerformAppend(t.topK, func() { - t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength) - }) + t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength) remainingRows -= uint64(fromLength) if fromLength == t.inputBatch.Length() { t.inputBatch = t.Input.Next() diff --git a/pkg/sql/colmem/allocator_test.go b/pkg/sql/colmem/allocator_test.go index cacc3042d931..fae04074dd2f 100644 --- a/pkg/sql/colmem/allocator_test.go +++ b/pkg/sql/colmem/allocator_test.go @@ -224,14 +224,19 @@ func TestPerformAppend(t *testing.T) { beforePerformOperation := testAllocator.Used() testAllocator.PerformOperation(batch1.ColVecs(), func() { - batch1.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length()) + for colIdx, destVec := range batch1.ColVecs() { + destVec.Append(coldata.SliceArgs{ + Src: inputBatch.ColVec(colIdx), + DestIdx: batch1.Length(), + SrcEndIdx: inputBatch.Length(), + }) + } + batch1.SetLength(batch1.Length() + inputBatch.Length()) }) afterPerformOperation := testAllocator.Used() beforePerformAppend := afterPerformOperation - testAllocator.PerformAppend(batch2, func() { - batch2.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length()) - }) + batch2.AppendTuples(inputBatch, 0 /* startIdx */, inputBatch.Length()) afterPerformAppend := testAllocator.Used() performOperationMem := afterPerformOperation - beforePerformOperation diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 336e52f02589..185b71b2ba84 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1877,6 +1877,7 @@ func TestLint(t *testing.T) { "--", "sql/col*", ":!sql/colexec/colexecutils/utils.go", + ":!sql/colmem/allocator_test.go", ) if err != nil { t.Fatal(err)