Skip to content

Commit

Permalink
colexecjoin: remove a copy when buffering the right group
Browse files Browse the repository at this point in the history
Previously, before enqueueing the tuples from the right buffered group
into the spiling queue we would perform a deep-copy. This is an overkill
because the spilling queue itself performs the deep copy. This commit
refactors the enqueueing code to modify the right batch directly to
include only the tuples from the group.

Release note: None
  • Loading branch information
yuzefovich committed Oct 29, 2021
1 parent 74a0e8e commit 4999ac5
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package colexecjoin

import (
"context"
"math"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -248,9 +247,8 @@ type mjBufferedGroupState struct {
// rightFirstTuple is the first tuple of the right buffered group. It is set
// only in case the right buffered group spans more than one input batch.
rightFirstTuple []coldata.Vec
// rightScratchBatch is a scratch space for copying the tuples out of the
// right input batches before enqueueing them into the spilling queue.
rightScratchBatch coldata.Batch
// scratchSel is a scratch selection vector initialized only when needed.
scratchSel []int

// helper is the building facility for the cross join of the buffered group.
helper *crossJoinerBase
Expand Down Expand Up @@ -699,29 +697,45 @@ func (o *mergeJoinBase) appendToRightBufferedGroup(sel []int, groupStartIdx int,
return
}

// We don't impose any memory limits on the scratch batch because we rely on
// the inputs to the merge joiner to produce reasonably sized batches.
const maxBatchMemSize = math.MaxInt64
o.bufferedGroup.rightScratchBatch, _ = o.unlimitedAllocator.ResetMaybeReallocate(
sourceTypes, o.bufferedGroup.rightScratchBatch, groupLength, maxBatchMemSize,
)
// TODO(yuzefovich): SpillingQueue.Enqueue deep-copies the batch too. Think
// through whether the copy here can be avoided altogether.
o.unlimitedAllocator.PerformOperation(o.bufferedGroup.rightScratchBatch.ColVecs(), func() {
for colIdx := range sourceTypes {
o.bufferedGroup.rightScratchBatch.ColVec(colIdx).Copy(
coldata.SliceArgs{
Src: o.proberState.rBatch.ColVec(colIdx),
Sel: sel,
DestIdx: 0,
SrcStartIdx: groupStartIdx,
SrcEndIdx: groupStartIdx + groupLength,
},
// Update the selection on the probing batch to only include tuples from the
// buffered group.
rBatch, rLength := o.proberState.rBatch, o.proberState.rLength
rSel := rBatch.Selection()
rBatchHasSel := rSel != nil
// No need to modify the batch if the whole batch is part of the buffered
// group.
needToModify := groupStartIdx != 0 || groupLength != rLength
if needToModify {
if rBatchHasSel {
// Since rBatch already has a selection vector which we'll be
// modifying, we need to copy the original.
o.bufferedGroup.scratchSel = colexecutils.EnsureSelectionVectorLength(o.bufferedGroup.scratchSel, rLength)
copy(o.bufferedGroup.scratchSel, rSel)
// Now we need to shift elements in range
// [groupStartIdx; groupStartIdx+groupLength) to the beginning of
// the selection vector and then update the length of the batch
// accordingly.
copy(rSel[:groupLength], rSel[groupStartIdx:groupStartIdx+groupLength])
rBatch.SetLength(groupLength)
} else {
// Since rBatch doesn't have a selection vector, we will set the
// selection vector to include tuples in range
// [groupStartIdx; groupStartIdx+groupLength).
colexecutils.UpdateBatchState(
rBatch, groupLength, true, /* usesSel */
colexecutils.DefaultSelectionVector[groupStartIdx:groupStartIdx+groupLength],
)
}
o.bufferedGroup.rightScratchBatch.SetLength(groupLength)
})
bufferedTuples.Enqueue(o.Ctx, o.bufferedGroup.rightScratchBatch)
}

bufferedTuples.Enqueue(o.Ctx, rBatch)

// If we had to modify the batch, then restore the original state now.
if needToModify {
colexecutils.UpdateBatchState(
rBatch, rLength, rBatchHasSel, o.bufferedGroup.scratchSel,
)
}
}

// sourceFinished returns true if either of input sources has no more rows.
Expand Down

0 comments on commit 4999ac5

Please sign in to comment.