Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexecjoin: optimize merge/cross joins #68921

Merged
merged 4 commits into from
Oct 30, 2021

Conversation

yuzefovich
Copy link
Member

@yuzefovich yuzefovich commented Aug 13, 2021

colexecjoin: make cross/merge join streaming with regards to left input

This commit refactors the cross and merge join to be streaming with
regards to the left input. Previously, we were using two spilling queues
to consume both inputs first before proceeding to building the cross
product (in case of the merge join this is needed when building from the
buffered group).

That approach is suboptimal because buffering only one side is
sufficient, so this commit switches the cross join builder to operate in
a streaming fashion with regards to the left input. This is done by
building all result rows that correspond to the current left batch
before proceeding to the next left batch and allows us to significantly
reduce amount of copying and, thus, improving the performance.

Fixes: #67816.

Release note: None

colexecjoin: improve probing in the merge joiner with nulls

For non set-operation joins whenever we have nulls in both columns we
can advance both pointers since neither of the rows will have a match.
This commit takes advantage of this observation as well as refactors
(hopefully making it cleaner) the probing mechanism a bit.

Release note: None

colexecjoin: avoid buffering tuples from the right in merge joiner

Depending on the join type, we don't need to fully buffer the tuples
from the right input in order to produce the output. Namely, for
set-operation joins we only need to know the number of right tuples
whereas for LEFT SEMI and RIGHT ANTI we know exactly the behavior of the
builder for the buffered group.

Release note: None

colexecjoin: remove a copy when buffering the right group

Previously, before enqueueing the tuples from the right buffered group
into the spiling queue we would perform a deep-copy. This is an overkill
because the spilling queue itself performs the deep copy. This commit
refactors the enqueueing code to modify the right batch directly to
include only the tuples from the group.

Release note: None

@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Aug 13, 2021
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch 7 times, most recently from 91830e5 to 3097c6b Compare August 17, 2021 04:14
@yuzefovich yuzefovich changed the title WIP on making cross/merge join streaming colexecjoin: optimize merge/cross joins Aug 17, 2021
@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from 3097c6b to 28382ff Compare August 17, 2021 04:45
@yuzefovich
Copy link
Member Author

The benchmarks are here.

@yuzefovich yuzefovich removed the do-not-merge bors won't merge a PR with this label. label Aug 17, 2021
@yuzefovich yuzefovich marked this pull request as ready for review August 17, 2021 04:55
@yuzefovich yuzefovich requested review from michae2 and a team August 17, 2021 04:55
@yuzefovich
Copy link
Member Author

@michae2 tagging you as the main reviewer, but if anyone else is interested in diving into the vectorized merge joiner - please let me know :) (I think only Jordan had some context earlier and probably nobody but me still has it :/ )

@yuzefovich
Copy link
Member Author

Rebased on top of master. PTAL.

@yuzefovich
Copy link
Member Author

cc @rytaft @cucaroach

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from 002e669 to 5ad9821 Compare October 6, 2021 03:19
@yuzefovich
Copy link
Member Author

Rebased on top of master just in case.

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch 2 times, most recently from 1e76b8a to 36f761d Compare October 6, 2021 23:11
Copy link
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reading, just noticed one thing so far.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 45 at r14 (raw file):

"set-op cross joins are invalid"

Are you sure this is true? 😈 For example:

CREATE TABLE t ();
CREATE TABLE u ();
INSERT INTO t DEFAULT VALUES;
INSERT INTO t DEFAULT VALUES;
INSERT INTO u DEFAULT VALUES;
SELECT * FROM t INTERSECT ALL SELECT * FROM u;

Looks like it uses cross join:

[email protected]:26257/defaultdb> EXPLAIN SELECT * FROM t INTERSECT ALL SELECT * FROM u;
                                        info
------------------------------------------------------------------------------------
  distribution: local
  vectorized: true

  • intersect all
  │
  ├── • scan
  │     estimated row count: 2 (100% of the table; stats collected 6 seconds ago)
  │     table: t@t_pkey
  │     spans: FULL SCAN
  │
  └── • scan
        missing stats
        table: u@u_pkey
        spans: FULL SCAN
(14 rows)


Time: 1ms total (execution 1ms / network 0ms)

[email protected]:26257/defaultdb> EXPLAIN (VEC) SELECT * FROM t INTERSECT ALL SELECT * FROM u;
               info
----------------------------------
  │
  └ Node 1
    └ *colexecjoin.crossJoiner
      ├ *colfetcher.ColBatchScan
      └ *colfetcher.ColBatchScan
(5 rows)

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from 36f761d to 1f3f7e2 Compare October 16, 2021 01:48
@yuzefovich yuzefovich requested a review from a team as a code owner October 16, 2021 01:48
@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from 1f3f7e2 to 75dfd99 Compare October 16, 2021 01:52
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @michae2)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 45 at r14 (raw file):

Previously, michae2 (Michael Erickson) wrote…

"set-op cross joins are invalid"

Are you sure this is true? 😈 For example:

CREATE TABLE t ();
CREATE TABLE u ();
INSERT INTO t DEFAULT VALUES;
INSERT INTO t DEFAULT VALUES;
INSERT INTO u DEFAULT VALUES;
SELECT * FROM t INTERSECT ALL SELECT * FROM u;

Looks like it uses cross join:

[email protected]:26257/defaultdb> EXPLAIN SELECT * FROM t INTERSECT ALL SELECT * FROM u;
                                        info
------------------------------------------------------------------------------------
  distribution: local
  vectorized: true

  • intersect all
  │
  ├── • scan
  │     estimated row count: 2 (100% of the table; stats collected 6 seconds ago)
  │     table: t@t_pkey
  │     spans: FULL SCAN
  │
  └── • scan
        missing stats
        table: u@u_pkey
        spans: FULL SCAN
(14 rows)


Time: 1ms total (execution 1ms / network 0ms)

[email protected]:26257/defaultdb> EXPLAIN (VEC) SELECT * FROM t INTERSECT ALL SELECT * FROM u;
               info
----------------------------------
  │
  └ Node 1
    └ *colexecjoin.crossJoiner
      ├ *colfetcher.ColBatchScan
      └ *colfetcher.ColBatchScan
(5 rows)

Indeed, nice catch! Fixed.

It's interesting that the set-op cross join occurs only when the tables don't have any visible columns and the query doesn't explicitly specify hidden columns to be returned.

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @michae2 and @rytaft)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 199 at r34 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Isn't this assignment already happening inside prepareForNextLeftBatch?

Done.


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 204 at r34 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Isn't this assignment already happening inside prepareForNextLeftBatch?

Done.


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 220 at r34 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Don't you always want to call this function after calling c.inputOne.Next()? (if the startIdx, c.numRightTuples in this case, is greater than the endIdx, you could potentially return early from that function.

Hm, I think it is possible to refactor the code so that we could call prepareForNextLeftBatch with startIdx > endIdx. However, I think it'll be more confusing overall.

My thinking for the current code is as follows: for EXCEPT ALL we perform the "subtraction" of the right rows from the left rows by skipping first len(right rows) from the left side. We do so explicitly here in setupForBuilding, and for all these "subtracted" rows we don't have any output to emit, so there is no point in calling prepareForNextLeftBatch on these "subtracted" batches.


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 397 at r34 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

It seems like you could simplify some logic in other places if you have this function call c.inputOne.Next(). Then you only need to pass in the startIdx, since it seems like you never call this function with endIdx equal to anything other than batch.Length().

If it's helpful, you could have this function return the batch length.

Edit: If this isn't possible (e.g., seems like you might need to call this function from the merge joiner?), maybe you should just create a wrapper around this function that does some of what I'm suggesting.

Yeah, we cannot do that for the merge joiner, but I like your suggestion, so I added a helper that is now used in most places, thanks!


pkg/sql/colexec/colexecjoin/mergejoiner.go, line 150 at r31 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Thank you for this comment!! So much clearer now! 👏👏👏

😃

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: great work! (But maybe wait for @michae2's approval too)

Reviewed 18 of 18 files at r35, 13 of 13 files at r36, 4 of 4 files at r38, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @rytaft)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 220 at r34 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, I think it is possible to refactor the code so that we could call prepareForNextLeftBatch with startIdx > endIdx. However, I think it'll be more confusing overall.

My thinking for the current code is as follows: for EXCEPT ALL we perform the "subtraction" of the right rows from the left rows by skipping first len(right rows) from the left side. We do so explicitly here in setupForBuilding, and for all these "subtracted" rows we don't have any output to emit, so there is no point in calling prepareForNextLeftBatch on these "subtracted" batches.

Ok, makes sense

Copy link
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives me a headache, but it's also exciting. 🙂 One day we could go even further into headache land and turn this into block nested loop join for fewer iterations of the right side.

Still reading, but I had a few questions. Sorry I'm so slow.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2, @rytaft, and @yuzefovich)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 118 at r35 (raw file):

c.buildFromLeftInput(c.Ctx, 0 /* destStartIdx */)

Do I understand it correctly that buildFromLeftInput might not fill the entire output batch (if it reaches the end of the current left batch), but buildFromRightInput will always fill the entire output batch (unless it reaches the end of the join results)?

It seems like we account for this possibility with c.output.SetLength(willEmit) below, but by the time we do that, builderState.right has already been updated as if the entire output batch was filled.

Seems like we need to pass willEmit into buildFromRightInput? Or maybe more simply call c.output.SetLength(willEmit) before calling buildFromRightInput?

Hopefully I'm just misunderstanding.


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 207 at r35 (raw file):

				currentBatch := bs.right.currentBatch
				if currentBatch == nil {
					currentBatch, err = b.rightTuples.Dequeue(ctx)

Question: Do we not have to worry about currentBatch.Selection() on the right side because SpillingQueue.Enqueue() only keeps selected rows?

Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One day we could go even further into headache land and turn this into block nested loop join for fewer iterations of the right side.

Oh, I'm already scared of that complexity lol.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @rytaft)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 118 at r35 (raw file):

Previously, michae2 (Michael Erickson) wrote…
c.buildFromLeftInput(c.Ctx, 0 /* destStartIdx */)

Do I understand it correctly that buildFromLeftInput might not fill the entire output batch (if it reaches the end of the current left batch), but buildFromRightInput will always fill the entire output batch (unless it reaches the end of the join results)?

It seems like we account for this possibility with c.output.SetLength(willEmit) below, but by the time we do that, builderState.right has already been updated as if the entire output batch was filled.

Seems like we need to pass willEmit into buildFromRightInput? Or maybe more simply call c.output.SetLength(willEmit) before calling buildFromRightInput?

Hopefully I'm just misunderstanding.

There are two exit conditions for both of the buildFrom*Input methods:

  • we are always making sure that we never exceed c.output.Capacity()
  • and we specify the precise values of setup.leftNumRepeats and setup.rightNumRepeats.

I think in the example scenario you mention, when the cross product with the current left batch doesn't fill the output up to capacity, rightNumRepeats will be set in such a manner that bs.right.numRepeatsIdx < bs.setup.rightNumRepeats will be false, so we'll exit out of buildFromRightInput before filling up to the capacity.

The rightNumRepeats is setup in prepareForNextLeftBatch (it depends on each left batch) whereas leftNumRepeats is set in setupLeftBuilder (it depends on the whole right input).


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 207 at r35 (raw file):

Previously, michae2 (Michael Erickson) wrote…

Question: Do we not have to worry about currentBatch.Selection() on the right side because SpillingQueue.Enqueue() only keeps selected rows?

Yes, exactly - that's the contract of the spilling queue. Do you think it's worth adding a comment about that?

Copy link
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 21 files at r1, 2 of 20 files at r19, 2 of 17 files at r27, 3 of 18 files at r35.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2, @rytaft, and @yuzefovich)


pkg/sql/colexec/colexecjoin/crossjoiner.go, line 118 at r35 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

There are two exit conditions for both of the buildFrom*Input methods:

  • we are always making sure that we never exceed c.output.Capacity()
  • and we specify the precise values of setup.leftNumRepeats and setup.rightNumRepeats.

I think in the example scenario you mention, when the cross product with the current left batch doesn't fill the output up to capacity, rightNumRepeats will be set in such a manner that bs.right.numRepeatsIdx < bs.setup.rightNumRepeats will be false, so we'll exit out of buildFromRightInput before filling up to the capacity.

The rightNumRepeats is setup in prepareForNextLeftBatch (it depends on each left batch) whereas leftNumRepeats is set in setupLeftBuilder (it depends on the whole right input).

Thank you!


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 158 at r35 (raw file):

Quoted 8 lines of code…
		if colIdx < len(b.left.types)-1 {
			// Transition to building the next column with the same
			// initial builder state as the current column.



			*bs = initialBuilderState
		}

nit: Instead of bs := &b.builderState.left at the top, could we use bs := initialBuilderState in the same place that we create outStartIdx so that it's clear we're operating on a copy for each column? (Then we would have to do the assignment back to b.builderState.left at the end.)


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 207 at r35 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Yes, exactly - that's the contract of the spilling queue. Do you think it's worth adding a comment about that?

No, it's ok, just asking.

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch 2 times, most recently from e0b3a3f to 4adbfc2 Compare October 22, 2021 20:24
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2 and @rytaft)


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 158 at r35 (raw file):

Previously, michae2 (Michael Erickson) wrote…
		if colIdx < len(b.left.types)-1 {
			// Transition to building the next column with the same
			// initial builder state as the current column.



			*bs = initialBuilderState
		}

nit: Instead of bs := &b.builderState.left at the top, could we use bs := initialBuilderState in the same place that we create outStartIdx so that it's clear we're operating on a copy for each column? (Then we would have to do the assignment back to b.builderState.left at the end.)

Hm, yeah, I think it'd be clearer this way, thanks. I also refactored the code a bit more, and it looks much simpler now.

@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from 4adbfc2 to ec8e79e Compare October 22, 2021 21:31
Copy link
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2 and @rytaft)


pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go, line 158 at r35 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, yeah, I think it'd be clearer this way, thanks. I also refactored the code a bit more, and it looks much simpler now.

Man, these copies on each column show up noticeably in the microbenchmarks:

name                                                       old alloc/op   new alloc/op     delta
CrossJoiner/spillForced=false/type=INNER/rows=1-24           93.0kB ± 0%      91.1kB ± 0%      -2.08%  (p=0.000 n=10+10)
CrossJoiner/spillForced=false/type=INNER/rows=16-24           113kB ± 1%       110kB ± 0%      -2.39%  (p=0.000 n=10+9)
CrossJoiner/spillForced=false/type=INNER/rows=256-24          188kB ± 0%       184kB ± 0%      -2.16%  (p=0.000 n=10+10)
CrossJoiner/spillForced=false/type=INNER/rows=2048-24         339kB ± 0%       778kB ± 0%    +129.36%  (p=0.000 n=10+9)
CrossJoiner/spillForced=false/type=INNER/rows=8192-24         863kB ± 0%      8915kB ± 0%    +933.35%  (p=0.000 n=9+10)
ROUTINE ======================== github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin.(*crossJoinerBase).buildFromLeftInput.func1 in /Users/yuzefovich/go/src/github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go
  760.52MB   760.52MB (flat, cum) 93.27% of Total
         .          .    692:					var srcStartIdx int
         .          .    693:					// Loop over every column.
         .          .    694:					for colIdx := range b.left.types {
         .          .    695:						// Get a fresh copy of the builder state so that each column has a fresh
         .          .    696:						// start.
  760.52MB   760.52MB    697:						bs := b.builderState.left
         .          .    698:						if colIdx == len(b.left.types)-1 {
         .          .    699:							// This is the last column. We need to update the builder state so
         .          .    700:							// that we continue where we left off on the next call.
         .          .    701:							defer func() {
         .          .    702:								b.builderState.left = bs

So I partially reverted the change. However, overall I think it has gotten cleaner.

@yuzefovich
Copy link
Member Author

BTW I updated the benchmarks (here).

This commit refactors the cross and merge join to be streaming with
regards to the left input. Previously, we were using two spilling queues
to consume both inputs first before proceeding to building the cross
product (in case of the merge join this is needed when building from the
buffered group).

That approach is suboptimal because buffering only one side is
sufficient, so this commit switches the cross join builder to operate in
a streaming fashion with regards to the left input. This is done by
building all result rows that correspond to the current left batch
before proceeding to the next left batch and allows us to significantly
reduce amount of copying and, thus, improving the performance.

Release note: None
For non set-operation joins whenever we have nulls in both columns we
can advance both pointers since neither of the rows will have a match.
This commit takes advantage of this observation as well as refactors
(hopefully making it cleaner) the probing mechanism a bit.

Release note: None
Depending on the join type, we don't need to fully buffer the tuples
from the right input in order to produce the output. Namely, for
set-operation joins we only need to know the number of right tuples
whereas for LEFT SEMI and RIGHT ANTI we know exactly the behavior of the
builder for the buffered group.

Release note: None
Previously, before enqueueing the tuples from the right buffered group
into the spiling queue we would perform a deep-copy. This is an overkill
because the spilling queue itself performs the deep copy. This commit
refactors the enqueueing code to modify the right batch directly to
include only the tuples from the group.

Release note: None
@yuzefovich yuzefovich force-pushed the streaming-cross-join branch from ec8e79e to 4999ac5 Compare October 29, 2021 00:29
@yuzefovich
Copy link
Member Author

Rebased on top of master to remove the merge conflict in the fourth commit.

Copy link
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: nice work!

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2 and @rytaft)

@yuzefovich
Copy link
Member Author

Thanks for the reviews!

bors r+

@craig
Copy link
Contributor

craig bot commented Oct 30, 2021

Build failed:

@yuzefovich
Copy link
Member Author

A flake.

bors r+

@craig
Copy link
Contributor

craig bot commented Oct 30, 2021

Build failed:

@yuzefovich
Copy link
Member Author

Same flake.

bors r+

@craig
Copy link
Contributor

craig bot commented Oct 30, 2021

Build succeeded:

@craig craig bot merged commit a15b840 into cockroachdb:master Oct 30, 2021
@yuzefovich yuzefovich deleted the streaming-cross-join branch October 30, 2021 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

colexec: suboptimal buffering behavior in cross and merge joins
4 participants