Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] distsql: row batching #20621

Closed
wants to merge 2 commits into from
Closed

[DO NOT MERGE] distsql: row batching #20621

wants to merge 2 commits into from

Conversation

asubiotto
Copy link
Contributor

Draft work on row batching with benchmark. Thought it would be useful to share this given discussion on #20555. Results:

name                      time/op
NextSteps/Normal-8        14.5ms ± 3%
NextSteps/ElideRowChan-8  11.3ms ± 5%
NextSteps/RowBatch-8      12.4ms ± 2%

This is a meaningless benchmark as RowBatch is equal to ElideRowChan but with row copies. Keep in mind that RowBatch needs to allocate row copies, but this is a cost that occurs once per local flow and can be amortized.

More meaningful benchmarks could be to set up flows with remote communication and longer flows with only local communication.

@asubiotto asubiotto requested review from a team December 11, 2017 18:56
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@petermattis
Copy link
Collaborator

Rather than set up a benchmark with remote communication, I'd rather see a benchmark which exercises a more complex local flow. See my comments below.

The fact that ElideRowChan is faster than RowBatch in this simplistic benchmark is what I would expect. It is going to be difficult to make anything which uses channels faster than direct function calls.


Reviewed 2 of 2 files at r1.
Review status: 2 of 4 files reviewed at latest revision, 2 unresolved discussions, some commit checks failed.


pkg/sql/distsqlrun/base.go, line 413 at r1 (raw file):

// Next is part of the RowSource interface. This implementation of Next() is not
// threadsafe.

I think RowChannel always has a single consumer. @RaduBerinde can answer definitively. We should update the commentary above RowChannel to indicate this.


pkg/sql/distsqlrun/tablereader_test.go, line 299 at r2 (raw file):

}

func BenchmarkNextSteps(b *testing.B) {

Two criticisms of this benchmark: the use of the tableReader will likely dominate the time in the benchmark and there is only one stage to the flow which is somewhat unrealistic. What I'd like to see is something that serves up synthetic data via NewRepeatableRowSource into a hashJoiner and from there to a count(*) aggregator.


Comments from Reviewable

@asubiotto
Copy link
Contributor Author

asubiotto commented Dec 12, 2017

Set up a hashjoin to count(*) benchmark with all three setups:

benchstat flowresults
name                         time/op
JoinAndCount/Normal-8        403ms ± 3%
JoinAndCount/ElideRowChan-8  340ms ± 2%
JoinAndCount/RowBatching-8   203ms ± 5%

The benefit we're seeing here is due to allowing processors to run concurrently, amortizing row channel overhead, and "tighter" loops (no special processing was done in the aggregator).

cc @RaduBerinde @andreimatei

@petermattis
Copy link
Collaborator

Thanks for adding these benchmarks. Sort of awkward to construct these flows manually, though. Would be nice if you could use Flow.setup, but I don't see how that is possible.


Review status: 1 of 5 files reviewed at latest revision, 5 unresolved discussions, some commit checks failed.


pkg/sql/distsqlrun/base_test.go, line 120 at r4 (raw file):

	// Input will serve as both left and right inputs. Every row will have the
	// same column value, thus producing inputSize^2 rows.
	const inputSize = 1000

Won't the output size be the input size given that you're joining on the single column? Perhaps I'm misunderstanding how HashJoinerSpec works.


pkg/sql/distsqlrun/base_test.go, line 181 at r4 (raw file):

				b.Fatal(err)
			}
			defer ag.Close(ctx)

You're setting up a ton of defers here which will not be run until the b.Run returns. Probably better to explicitly move this to after ag.RenderResults.


pkg/sql/distsqlrun/base_test.go, line 219 at r4 (raw file):

			var wg sync.WaitGroup
			go func() {
				wg.Add(1)

You need run the wg.Add(1) outside of the go function, otherwise this can race with calling wg.Wait().


Comments from Reviewable

@petermattis
Copy link
Collaborator

With the fixes I mention below, the discrepancy between eliding and row-batching is much smaller on my machine:

name                         time/op
JoinAndCount/Normal-8        366ms ± 1%
JoinAndCount/ElideRowChan-8  183ms ± 1%
JoinAndCount/RowBatching-8   158ms ± 1%

Review status: 1 of 5 files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.


pkg/sql/distsqlrun/aggregator.go, line 279 at r4 (raw file):

}

func (ag *aggregator) NextRow(ctx context.Context, scratch []byte, row sqlbase.EncDatumRow, meta ProducerMetadata) (cont bool, err error) {

Passing in scratch like this is having no effect. We'd need to add scratch to aggregator (or return it from this method) for a real refactoring.


pkg/sql/distsqlrun/aggregator.go, line 288 at r4 (raw file):

			}
		}
	}()

This defer is now in the inner-loop.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Another interesting data point is varying the row batch size:

name                             time/op
JoinAndCount/RowBatching=1-8     431ms ± 1%
JoinAndCount/RowBatching=2-8     330ms ± 1%
JoinAndCount/RowBatching=4-8     241ms ± 1%
JoinAndCount/RowBatching=16-8    187ms ± 1%
JoinAndCount/RowBatching=32-8    178ms ± 1%
JoinAndCount/RowBatching=64-8    173ms ± 2%
JoinAndCount/RowBatching=128-8   171ms ± 1%
JoinAndCount/RowBatching=256-8   163ms ± 1%
JoinAndCount/RowBatching=512-8   161ms ± 1%
JoinAndCount/RowBatching=1024-8  159ms ± 2%

So batching ~16 rows is equivalent to eliding the row channel. That's interesting because the benchmark always uses full size batches which is optimistic.


Review status: 1 of 5 files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Something the ElideRowChan benchmark doesn't capture is the opportunity to avoid allocations. One of the big costs in the current benchmark is that every row emitted from the hash joiner needs to be copied since we don't know when it will be consumed. If we modeled hashJoiner as a RowSource, we could avoid those allocations. Hacking this into the benchmark results in:

name                         time/op
JoinAndCount/ElideRowChan-8  135ms ± 4%

I'm not sure how to get similar benefits without eliding row channels. Possibly we could use pool allocation and return batches to the pool once they have been consumed.


Review status: 1 of 5 files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.


Comments from Reviewable

@petermattis
Copy link
Collaborator

Some more profiling reveals another place of excessive allocations: creating a new hashMemRowBucketIterator on every call to hashJoiner.probeRow. Easy enough to fix. Benchmarks are great.


Review status: 1 of 5 files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.


Comments from Reviewable

@asubiotto
Copy link
Contributor Author

Thanks for taking a look at this. It's definitely useful to have these benchmarks. I'll add these in separate PRs and see if I can do them in a less manual manner.

The big thing that stands out to me is the benefit that row channel elision gives us regarding allocations. We can try to approximate that with row batching by, as you say, pooling allocations and reusing batches once consumed but it is nice to have that by default.

I want to go back to the question of what we should focus on before the feature freeze. We definitely get a 50% performance improvement in this benchmark by using row batching. Row channel elision can provide better performance by removing unnecessary allocations, but I'm wondering how general that improvement can be since this only applies when we would synchronously plan processors on the same node. Consider the example in this benchmark. Although realistic to a certain extent, a user running a COUNT(*) will probably have at least one other node involved in the query. The aggregator at the end will have to wait for the slowest node to return the last results (unless we implement online aggregation), so although local results will flow in much faster, remote flows will dominate latency. Row batching, on the other hand, will improve these cases.

Additionally, row channel elision is going to be a huge change. It fundamentally changes how we think about processors/communication in distsql and requires some careful thinking when planning. I definitely don't see this change making it into 2.0. Row batching is a much less invasive change that we need anyway, gives us significant performance improvements and needs a lot less work to provide us with significant results (I'm thinking COUNT(*)).

I would like to make a decision by the end of the day since the feature freeze is coming up pretty soon.

@knz
Copy link
Contributor

knz commented Dec 21, 2017

IMHO row batching is absolutely essential, always-good optimization. In comparison, channel elision is an incidental, sometimes-good optimization. Benchmarks are good, but we should not mistake the forest for the tree. Progress on row batching must be prioritized, even if it doesn't yet fully work for 2.0.

@petermattis
Copy link
Collaborator

IMHO row batching is absolutely essential, always-good optimization. In comparison, channel elision is an incidental, sometimes-good optimization. Benchmarks are good, but we should not mistake the forest for the tree. Progress on row batching must be prioritized, even if it doesn't yet fully work for 2.0.

Row batching is @asubiotto's primary focus. What additional prioritization would you give it? Discussed offline and not captured above is the current plan to do both row batching and channel elision for 2.0. Channel elision is going to open up other optimization opportunities such as reducing allocations which will have a very large impact as #20755 and #20759 hint at. I feel like I have a decent model of distsql execution in my head now and I'm pretty confident we should do both row batching and channel elision.

@knz
Copy link
Contributor

knz commented Dec 21, 2017

I'm glad you discussed this offline. I was reacting to Alfonso's last comment on this PR. I was not aware of any further discussion after that.

@asubiotto
Copy link
Contributor Author

The discussion was continued on #20555

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants