diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner.go b/pkg/sql/colexec/colexecjoin/mergejoiner.go index 353a2afc4d9b..79e492b64df3 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner.go @@ -12,7 +12,6 @@ package colexecjoin import ( "context" - "math" "unsafe" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -92,9 +91,8 @@ type mjBufferedGroupState struct { // rightFirstTuple is the first tuple of the right buffered group. It is set //only in case the right buffered group spans more than one input batch. rightFirstTuple []coldata.Vec - // rightScratchBatch is a scratch space for copying the tuples out of the - // right input batches before enqueueing them into the spilling queue. - rightScratchBatch coldata.Batch + // scratchSel is a scratch selection vector initialized only when needed. + scratchSel []int // helper is the building facility for the cross join of the buffered group. helper *crossJoinerBase @@ -605,29 +603,56 @@ func (o *mergeJoinBase) appendToRightBufferedGroup(sel []int, groupStartIdx int, return } - // We don't impose any memory limits on the scratch batch because we rely on - // the inputs to the merge joiner to produce reasonably sized batches. - const maxBatchMemSize = math.MaxInt64 - o.bufferedGroup.rightScratchBatch, _ = o.unlimitedAllocator.ResetMaybeReallocate( - sourceTypes, o.bufferedGroup.rightScratchBatch, groupLength, maxBatchMemSize, - ) - // TODO(yuzefovich): SpillingQueue.Enqueue deep-copies the batch too. Think - // through whether the copy here can be avoided altogether. - o.unlimitedAllocator.PerformOperation(o.bufferedGroup.rightScratchBatch.ColVecs(), func() { - for colIdx := range sourceTypes { - o.bufferedGroup.rightScratchBatch.ColVec(colIdx).Copy( - coldata.SliceArgs{ - Src: o.proberState.rBatch.ColVec(colIdx), - Sel: sel, - DestIdx: 0, - SrcStartIdx: groupStartIdx, - SrcEndIdx: groupStartIdx + groupLength, - }, + // Update the selection on the probing batch to only include tuples from the + // buffered group. + rBatch, rLength := o.proberState.rBatch, o.proberState.rLength + rSel := rBatch.Selection() + rBatchHasSel := rSel != nil + // No need to modify the batch if the whole batch is part of the buffered + // group. + needToModify := groupStartIdx != 0 || groupLength != rLength + if needToModify { + if rBatchHasSel { + // Since rBatch already has a selection vector which we'll be + // modifying, we need to copy the original. + o.bufferedGroup.scratchSel = colexecutils.EnsureSelectionVectorLength(o.bufferedGroup.scratchSel, rLength) + copy(o.bufferedGroup.scratchSel, rSel) + // Now we need to shift elements in range + // [groupStartIdx; groupStartIdx+groupLength) to the beginning of + // the selection vector and then update the length of the batch + // accordingly. + copy(rSel[:groupLength], rSel[groupStartIdx:groupStartIdx+groupLength]) + rBatch.SetLength(groupLength) + } else { + // Since rBatch doesn't have a selection vector, we will set the + // selection vector to include tuples in range + // [groupStartIdx; groupStartIdx+groupLength). + colexecutils.UpdateBatchState( + rBatch, groupLength, true, /* usesSel */ + defaultSelectionVector[groupStartIdx:groupStartIdx+groupLength], ) } - o.bufferedGroup.rightScratchBatch.SetLength(groupLength) - }) - bufferedTuples.Enqueue(o.Ctx, o.bufferedGroup.rightScratchBatch) + } + + bufferedTuples.Enqueue(o.Ctx, rBatch) + + // If we had to modify the batch, then restore the original state now. + if needToModify { + colexecutils.UpdateBatchState( + rBatch, rLength, rBatchHasSel, o.bufferedGroup.scratchSel, + ) + } +} + +// defaultSelectionVector contains all integers in [0, coldata.MaxBatchSize) +// range. +var defaultSelectionVector []int + +func init() { + defaultSelectionVector = make([]int, coldata.MaxBatchSize) + for i := range defaultSelectionVector { + defaultSelectionVector[i] = i + } } // sourceFinished returns true if either of input sources has no more rows.