From 002e669e3e85c3313266b497a74341af2dbae048 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Aug 2021 21:12:53 -0700 Subject: [PATCH] colexecjoin: remove a copy when buffering the right group Previously, before enqueueing the tuples from the right buffered group into the spiling queue we would perform a deep-copy. This is an overkill because the spilling queue itself performs the deep copy. This commit refactors the enqueueing code to modify the right batch directly to include only the tuples from the group. Release note: None --- pkg/sql/colexec/colexecjoin/mergejoiner.go | 72 ++++++++++++++-------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner.go b/pkg/sql/colexec/colexecjoin/mergejoiner.go index fd672069ca3d..bf99dd11b9e1 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner.go @@ -12,7 +12,6 @@ package colexecjoin import ( "context" - "math" "unsafe" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -92,9 +91,8 @@ type mjBufferedGroupState struct { // rightFirstTuple is the first tuple of the right buffered group. It is set //only in case the right buffered group spans more than one input batch. rightFirstTuple []coldata.Vec - // rightScratchBatch is a scratch space for copying the tuples out of the - // right input batches before enqueueing them into the spilling queue. - rightScratchBatch coldata.Batch + // scratchSel is a scratch selection vector initialized only when needed. + scratchSel []int // helper is the building facility for the cross join of the buffered group. helper *crossJoinerBase @@ -605,29 +603,51 @@ func (o *mergeJoinBase) appendToRightBufferedGroup(sel []int, groupStartIdx int, return } - // We don't impose any memory limits on the scratch batch because we rely on - // the inputs to the merge joiner to produce reasonably sized batches. - const maxBatchMemSize = math.MaxInt64 - o.bufferedGroup.rightScratchBatch, _ = o.unlimitedAllocator.ResetMaybeReallocate( - sourceTypes, o.bufferedGroup.rightScratchBatch, groupLength, maxBatchMemSize, - ) - // TODO(yuzefovich): SpillingQueue.Enqueue deep-copies the batch too. Think - // through whether the copy here can be avoided altogether. - o.unlimitedAllocator.PerformOperation(o.bufferedGroup.rightScratchBatch.ColVecs(), func() { - for colIdx := range sourceTypes { - o.bufferedGroup.rightScratchBatch.ColVec(colIdx).Copy( - coldata.SliceArgs{ - Src: o.proberState.rBatch.ColVec(colIdx), - Sel: sel, - DestIdx: 0, - SrcStartIdx: groupStartIdx, - SrcEndIdx: groupStartIdx + groupLength, - }, - ) + // Update the selection on the probing batch to only include tuples from the + // buffered group. + rBatch, rLength := o.proberState.rBatch, o.proberState.rLength + rSel := rBatch.Selection() + rBatchHasSel := rSel != nil + // No need to modify the batch if the whole batch is part of the buffered + // group. + needToModify := groupStartIdx != 0 || groupLength != rLength + if needToModify { + if rBatchHasSel { + o.bufferedGroup.scratchSel = colexecutils.EnsureSelectionVectorLength(o.bufferedGroup.scratchSel, rLength) + copy(o.bufferedGroup.scratchSel, rSel) + for idx := 0; idx < groupLength; idx++ { + rSel[idx] = o.bufferedGroup.scratchSel[groupStartIdx+idx] + } + } else { + rBatch.SetSelection(true) + rSel = rBatch.Selection() + copy(rSel, defaultSelectionVector[groupStartIdx:groupStartIdx+groupLength]) } - o.bufferedGroup.rightScratchBatch.SetLength(groupLength) - }) - bufferedTuples.Enqueue(o.Ctx, o.bufferedGroup.rightScratchBatch) + rBatch.SetLength(groupLength) + } + + bufferedTuples.Enqueue(o.Ctx, rBatch) + + // If we had to modify the batch, then restore the original state now. + if needToModify { + if rBatchHasSel { + copy(rSel, o.bufferedGroup.scratchSel) + } else { + rBatch.SetSelection(false) + } + rBatch.SetLength(rLength) + } +} + +// defaultSelectionVector contains all integers in [0, coldata.MaxBatchSize) +// range. +var defaultSelectionVector []int + +func init() { + defaultSelectionVector = make([]int, coldata.MaxBatchSize) + for i := range defaultSelectionVector { + defaultSelectionVector[i] = i + } } // sourceFinished returns true if either of input sources has no more rows.