Skip to content

Commit

Permalink
Merge #68921
Browse files Browse the repository at this point in the history
68921: colexecjoin: optimize merge/cross joins r=yuzefovich a=yuzefovich

**colexecjoin: make cross/merge join streaming with regards to left input**

This commit refactors the cross and merge join to be streaming with
regards to the left input. Previously, we were using two spilling queues
to consume both inputs first before proceeding to building the cross
product (in case of the merge join this is needed when building from the
buffered group).

That approach is suboptimal because buffering only one side is
sufficient, so this commit switches the cross join builder to operate in
a streaming fashion with regards to the left input. This is done by
building all result rows that correspond to the current left batch
before proceeding to the next left batch and allows us to significantly
reduce amount of copying and, thus, improving the performance.

Fixes: #67816.

Release note: None

**colexecjoin: improve probing in the merge joiner with nulls**

For non set-operation joins whenever we have nulls in both columns we
can advance both pointers since neither of the rows will have a match.
This commit takes advantage of this observation as well as refactors
(hopefully making it cleaner) the probing mechanism a bit.

Release note: None

**colexecjoin: avoid buffering tuples from the right in merge joiner**

Depending on the join type, we don't need to fully buffer the tuples
from the right input in order to produce the output. Namely, for
set-operation joins we only need to know the number of right tuples
whereas for LEFT SEMI and RIGHT ANTI we know exactly the behavior of the
builder for the buffered group.

Release note: None

**colexecjoin: remove a copy when buffering the right group**

Previously, before enqueueing the tuples from the right buffered group
into the spiling queue we would perform a deep-copy. This is an overkill
because the spilling queue itself performs the deep copy. This commit
refactors the enqueueing code to modify the right batch directly to
include only the tuples from the group.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Oct 30, 2021
2 parents 9c89a20 + 4999ac5 commit a15b840
Show file tree
Hide file tree
Showing 22 changed files with 17,160 additions and 13,799 deletions.
2,505 changes: 1,213 additions & 1,292 deletions pkg/sql/colexec/colexecjoin/crossjoiner.eg.go

Large diffs are not rendered by default.

406 changes: 209 additions & 197 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go

Large diffs are not rendered by default.

339 changes: 132 additions & 207 deletions pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go

Large diffs are not rendered by default.

52 changes: 24 additions & 28 deletions pkg/sql/colexec/colexecjoin/mergejoinbase.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions pkg/sql/colexec/colexecjoin/mergejoinbase_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,13 @@ func _ASSIGN_EQ(_, _, _, _, _, _ interface{}) int {
// */}}

// isBufferedGroupFinished checks to see whether or not the buffered group
// corresponding to input continues in batch.
// corresponding to the first tuple continues in batch.
func (o *mergeJoinBase) isBufferedGroupFinished(
input *mergeJoinInput, batch coldata.Batch, rowIdx int,
input *mergeJoinInput, firstTuple []coldata.Vec, batch coldata.Batch, rowIdx int,
) bool {
if batch.Length() == 0 {
return true
}
bufferedGroup := o.bufferedGroup.left
if input == &o.right {
bufferedGroup = o.bufferedGroup.right
}
tupleToLookAtIdx := rowIdx
sel := batch.Selection()
if sel != nil {
Expand All @@ -89,7 +85,7 @@ func (o *mergeJoinBase) isBufferedGroupFinished(
// right side being an input) this check will always return false since
// nulls couldn't be buffered up though.
// TODO(yuzefovich): consider templating this.
bufferedNull := bufferedGroup.firstTuple[colIdx].MaybeHasNulls() && bufferedGroup.firstTuple[colIdx].Nulls().NullAt(0)
bufferedNull := firstTuple[colIdx].MaybeHasNulls() && firstTuple[colIdx].Nulls().NullAt(0)
incomingNull := batch.ColVec(int(colIdx)).MaybeHasNulls() && batch.ColVec(int(colIdx)).Nulls().NullAt(tupleToLookAtIdx)
if o.joinType.IsSetOpJoin() {
if bufferedNull && incomingNull {
Expand All @@ -100,7 +96,7 @@ func (o *mergeJoinBase) isBufferedGroupFinished(
if bufferedNull || incomingNull {
return true
}
bufferedCol := bufferedGroup.firstTuple[colIdx].TemplateType()
bufferedCol := firstTuple[colIdx].TemplateType()
prevVal := bufferedCol.Get(0)
col := batch.ColVec(int(colIdx)).TemplateType()
curVal := col.Get(tupleToLookAtIdx)
Expand Down
Loading

0 comments on commit a15b840

Please sign in to comment.