-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
colexec: take advantage of partial ordering in the hash aggregator #71121
colexec: take advantage of partial ordering in the hash aggregator #71121
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @rytaft and @yuzefovich)
pkg/sql/colexec/aggregators_test.go, line 95 at r2 (raw file):
}, name: "hash-partial-order", order: Partial,
Wanted feedback on adding the partial order case to this list. It improves coverage by adding it to several unit tests, but it also adds it to several benchmarks, which increases the number of tests run by ~30%. Should we keep this, remove it, or judiciously skip this ordering in some cases?
pkg/sql/colexec/hash_aggregator_test.go, line 496 at r2 (raw file):
chunkSizes := []int{1, 32, coldata.BatchSize(), coldata.BatchSize() + 1} // groupSizes is the total number of groups. groupSizes := []int{1, 32, 128, coldata.BatchSize() + 1}
Looking for feedback on useful test cases. As you may have noticed looking at the benchmark results, this benchmark runs a lot of cases so it may be good to prune. I think the coldata.BatchSize()+1
is useful to span batch boundaries, maybe we can prune some smaller cases?
pkg/sql/colexec/hash_aggregator_tmpl.go, line 167 at r2 (raw file):
// If partialOrder is false, there are no guarantees on ordering and all input // tuples are processed before emitting any data. // execgen:template<partialOrder>
Note: I tried to inline this function, too, but it doesn't generate any code and make execgen
doesn't log any error. Any idea why that is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work! As often, I have a bunch of nits but also some hopefully helpful comments.
nit: the PR description will be included by bors into the merge commit message, so it's good practice to wrap lines at 80 characters there too.
Reviewed 33 of 33 files at r1, 3 of 3 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, and @rytaft)
-- commits, line 60 at r2:
nit: s/bending/pending/
.
pkg/sql/colexec/aggregators_test.go, line 62 at r1 (raw file):
} type Ordering int64
nit: does this have to be exported?
pkg/sql/colexec/aggregators_test.go, line 152 at r1 (raw file):
if !tc.unorderedInput { if len(tc.orderedCols) == 0 { // If we have ordered on grouping columns input, then we'll require the
nit: s/ordered/ordering/
.
pkg/sql/colexec/aggregators_test.go, line 162 at r1 (raw file):
// In case there is partial ordering on the input, note the subset of ordered columns. tc.spec.OrderedGroupCols = tc.orderedCols outputOrdering := execinfrapb.Ordering{Columns: make([]execinfrapb.Ordering_Column, len(tc.orderedCols))}
nit: we could extract the creation of outputOrdering
to be outside of the if
branches to save on some duplication.
pkg/sql/colexec/aggregators_test.go, line 981 at r1 (raw file):
distinctProb float64, numInputRows int, chunkSize int,
Do we want to ensure that chunkSize > groupSize
for partial order type?
pkg/sql/colexec/aggregators_test.go, line 1018 at r1 (raw file):
curGroup := -1 for i := 0; i < numInputRows; i++ { if groupSize == 1 || i%groupSize == 0 {
nit: I think this could be simplified a bit because i % groupSize
is equal to 0 when groupSize == 1
, so we can just keep the second part of the conditional.
pkg/sql/colexec/aggregators_test.go, line 1033 at r1 (raw file):
groups = cols[1].Int64() curChunk := -1 numGroups := groupSize / chunkSize
I think we want to use the reverse here, i.e. chunkSize / groupSize
. My reasoning that "chunk" might contain multiple groups, so for example if we have chunkSize == 4
, groupSize == 2
, then we want to populate something like (0, 1), (0, 0), (0, 0), (0, 1), (1, 0), (1, 1), (1, 0), (1, 1)
.
pkg/sql/colexec/aggregators_test.go, line 95 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Wanted feedback on adding the partial order case to this list. It improves coverage by adding it to several unit tests, but it also adds it to several benchmarks, which increases the number of tests run by ~30%. Should we keep this, remove it, or judiciously skip this ordering in some cases?
I think we don't want to add the partial order type in this list because all of the existing tests and benchmarks will not exercise the partial ordering in any way, so we will be simply duplicating the test runs of the unordered hash aggregator.
Maybe we should define a separate list aggTypesWithPartial
that does include the partial order and use that list in a couple of places where it makes sense?
pkg/sql/colexec/case_test.go, line 89 at r1 (raw file):
} { colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tc.tuples}, tc.expected, colexectestutils.OrderedVerifier, nil, /* orderedCols */
Rather than having to plumb nil /* orderedCols */
in a lot of callsites, it might be cleaner to keep the signature of RunTests
unchanged but to introduce RunTestsOrderedCols
that would take in orderedCols
argument, then we'd call the new method in the aggregator tests and would refactor RunTests
to call the new method with nil
for orderedCols
.
pkg/sql/colexec/external_distinct_test.go, line 194 at r1 (raw file):
[]colexectestutils.Tuples{tups}, [][]*types.T{typs}, expected,
nit: why move this line?
pkg/sql/colexec/external_hash_aggregator_test.go, line 186 at r1 (raw file):
numForcedRepartitions := rng.Intn(5) HashAggregationDiskSpillingEnabled.Override(ctx, &flowCtx.Cfg.Settings.SV, true) for _, memoryLimitBytes := range []int64{hashAggregatorAllocSize * sizeOfAggBucket, hashAggregatorAllocSize * sizeOfAggBucket * 2} {
Is the only difference between TestExternalHashAggregator
and TestExternalHashAggregatorDiskSpiller
the fact that we're using different memory limits in the latter? If so, I think it'd be better to refactor the former to include the different memory limits.
How about we introduce something like
type testConfig struct {
diskSpillingEnabled bool
spillForced bool
memoryLimitBytes int64
}
and then we can populate all the configs we're interested in.
pkg/sql/colexec/hash_aggregator.go, line 208 at r2 (raw file):
aggFnsAlloc, inputArgsConverter, toClose, err := colexecagg.NewAggregateFuncsAlloc( args, args.Spec.Aggregations, hashAggregatorAllocSize, colexecagg.HashAggKind, )
Since we're overriding err
value below now, we should add if err != nil
check here and then remove the err check in case of partial order.
pkg/sql/colexec/hash_aggregator.go, line 232 at r2 (raw file):
hashAgg.distincterInput = &colexecop.FeedOperator{} hashAgg.distincter, hashAgg.distinctOutput, err = colexecbase.OrderedDistinctColsToOperators( hashAgg.distincterInput, args.Spec.OrderedGroupCols, args.InputTypes, true, /* nullsAreDistinct */
I think we should use false
for nullsAreDistinct
. If this is indeed a bug, it'd be good to add a test case that exposes the bug.
pkg/sql/colexec/hash_aggregator_test.go, line 465 at r1 (raw file):
}, } { benchmarkAggregateFunction(b, agg, aggFn, []*types.T{types.Int}, 1, groupSize, 0, numInputRows, 0, 0)
nit: it's helpful to have inlined comments for the constants.
pkg/sql/colexec/hash_aggregator_test.go, line 493 at r1 (raw file):
aggFn := execinfrapb.Min numRows := []int{1, 32, coldata.BatchSize(), 32 * coldata.BatchSize(), 1024 * coldata.BatchSize()} // chunkSizes is the number distinct values in the ordered group column.
nit: s/number distinct/number of distinct/
.
pkg/sql/colexec/hash_aggregator_test.go, line 516 at r1 (raw file):
for _, agg := range []aggType{ { new: func(args *colexecagg.NewAggregatorArgs) (colexecop.ResettableOperator, error) {
nit: we could remove some duplication for these two constructors.
pkg/sql/colexec/hash_aggregator_test.go, line 496 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Looking for feedback on useful test cases. As you may have noticed looking at the benchmark results, this benchmark runs a lot of cases so it may be good to prune. I think the
coldata.BatchSize()+1
is useful to span batch boundaries, maybe we can prune some smaller cases?
Yeah, it'd be good to prune things a bit, but I don't have a good feeling for which cases we'd like to keep. I guess I'd remove coldata.BatchSize()
one for chunkSize
if we're keeping coldata.BatchSize()+1
. Maybe also keep only two options for limits
.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 127 at r2 (raw file):
op *hashAggregator, start int, end int, sel []int, useSel bool, ascending bool, ) (bool, int) { if ascending {
We could eliminate some bounds checks in the loops below (https://www.ardanlabs.com/blog/2018/04/bounds-check-elimination-in-go.html). Leaving a TODO for this seems good, or we can do that right away - let me know if you need some guidance.
Actually, now that I've typed this out, quickly discussing bounds checks elimination could be a good topic for our Thursday's collaboration session.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 139 at r2 (raw file):
} } else { for i := start; i >= end; i-- {
Should end
be exclusive here? Currently for ascending
case we work on [start, end)
but for descending
case on [start, end]
.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 165 at r2 (raw file):
// buffering more incoming tuples. // // If partialOrder is false, there are no guarantees on ordering and all input
nit: I'd explicitly say "no guarantees on input ordering" because the ordering of grouping columns is preserved by the hash aggregator, even if that ordering is arbitrary.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 167 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Note: I tried to inline this function, too, but it doesn't generate any code and
make execgen
doesn't log any error. Any idea why that is?
In hashAggregator.Next
maybe try something like
batch := getNext(op, true)
return batch
(I'm thinking that there might be a limitation where an inlined function cannot be used in the "return context".)
Also, I think execgen
supports verbose mode, so it might be worth enabling it locally. You can build execgen
binary and then attempt to generate hash_aggregator.eg.go
. You can also actually run execgen
via the debugger and get some idea of what's going on internally.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 349 at r2 (raw file):
if op.bufferingState.pendingBatch.Length() > 0 { // Clear the buckets. op.state = hashAggregatorBuffering
I think it'd be good to refactor hashAggregator.Reset
method to split out the logic that we want to do here into a separate method (resetPartial
or something) that we'd call here.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
op.inputTrackingState.zeroBatchEnqueued = false if op.bufferingState.unprocessedIdx < op.bufferingState.pendingBatch.Length() { b := colexecutils.NewAppendOnlyBufferedBatch(op.accountingHelper.Allocator, op.inputTypes, nil /* colsToStore */)
Rather than creating a new batch to append the unprocessed tuples so that we could enqueue them into the input tracker, we could adjust the selection vector on the pending batch to include only the unprocessed tuples, enqueue the pending batch, and then restore the original state of the pending batch. We have colexecutils.UpdateBatchState
that might be helpful.
pkg/sql/colexec/colexectestutils/utils.go, line 1303 at r1 (raw file):
distincterInput := &colexecop.FeedOperator{} distincter, distinctOutput, err := colexecbase.OrderedDistinctColsToOperators( distincterInput, r.orderedCols, r.typs, true, /* nullsAreDistinct */
I think for aggregation nulls are treated as equal, so we want to use false
for nullsAreDistinct
:
[email protected]:26257/defaultdb> create table t (a int primary key, b int);
CREATE TABLE
Time: 3ms total (execution 3ms / network 0ms)
[email protected]:26257/defaultdb> insert into t values (1, null), (2, null);
INSERT 2
Time: 6ms total (execution 6ms / network 0ms)
[email protected]:26257/defaultdb> select array_agg(a) from t group by b;
array_agg
-------------
{1,2}
(1 row)
pkg/sql/colexec/colexectestutils/utils.go, line 1315 at r1 (raw file):
if err := colexecerror.CatchVectorizedRuntimeError(func() { for { // Get actual batch
nit: missing period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, and @rytaft)
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Rather than creating a new batch to append the unprocessed tuples so that we could enqueue them into the input tracker, we could adjust the selection vector on the pending batch to include only the unprocessed tuples, enqueue the pending batch, and then restore the original state of the pending batch. We have
colexecutils.UpdateBatchState
that might be helpful.
BTW I had to do something similar in this commit 1e76b8a.
3d5f646
to
e192dce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR! I still need to fix an import dependency cycle, but wanted to address review comments first.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rytaft, and @yuzefovich)
pkg/sql/colexec/aggregators_test.go, line 62 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: does this have to be exported?
Done.
pkg/sql/colexec/aggregators_test.go, line 152 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/ordered/ordering/
.
Done.
pkg/sql/colexec/aggregators_test.go, line 162 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: we could extract the creation of
outputOrdering
to be outside of theif
branches to save on some duplication.
Done.
pkg/sql/colexec/aggregators_test.go, line 981 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Do we want to ensure that
chunkSize > groupSize
for partial order type?
Done.
pkg/sql/colexec/aggregators_test.go, line 1018 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I think this could be simplified a bit because
i % groupSize
is equal to 0 whengroupSize == 1
, so we can just keep the second part of the conditional.
Done.
pkg/sql/colexec/aggregators_test.go, line 1033 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think we want to use the reverse here, i.e.
chunkSize / groupSize
. My reasoning that "chunk" might contain multiple groups, so for example if we havechunkSize == 4
,groupSize == 2
, then we want to populate something like(0, 1), (0, 0), (0, 0), (0, 1), (1, 0), (1, 1), (1, 0), (1, 1)
.
I was trying to generate the same number of groups that would be used in the non partial ordered case with the same groupSize, to make it easier to compare, but perhaps that's too confusing (it also makes groupSize
a misnomer). I'll change it to your suggestion, which is more intuitive.
pkg/sql/colexec/aggregators_test.go, line 95 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think we don't want to add the partial order type in this list because all of the existing tests and benchmarks will not exercise the partial ordering in any way, so we will be simply duplicating the test runs of the unordered hash aggregator.
Maybe we should define a separate list
aggTypesWithPartial
that does include the partial order and use that list in a couple of places where it makes sense?
Thanks for the feedback! I added the aggTypesWithPartial
to TestAggregatorRandom
, which is the main test that more coverage would benefit partial ordering.
pkg/sql/colexec/external_hash_aggregator_test.go, line 186 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Is the only difference between
TestExternalHashAggregator
andTestExternalHashAggregatorDiskSpiller
the fact that we're using different memory limits in the latter? If so, I think it'd be better to refactor the former to include the different memory limits.How about we introduce something like
type testConfig struct { diskSpillingEnabled bool spillForced bool memoryLimitBytes int64 }
and then we can populate all the configs we're interested in.
Good suggestion. Done.
pkg/sql/colexec/hash_aggregator.go, line 208 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Since we're overriding
err
value below now, we should addif err != nil
check here and then remove the err check in case of partial order.
Done.
pkg/sql/colexec/hash_aggregator.go, line 232 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think we should use
false
fornullsAreDistinct
. If this is indeed a bug, it'd be good to add a test case that exposes the bug.
Thanks for catching this. Added a test case, verified that it failed if the verifier is set to false when this stays true, and fixed.
pkg/sql/colexec/hash_aggregator_test.go, line 465 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: it's helpful to have inlined comments for the constants.
Done.
pkg/sql/colexec/hash_aggregator_test.go, line 493 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/number distinct/number of distinct/
.
Done.
pkg/sql/colexec/hash_aggregator_test.go, line 516 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: we could remove some duplication for these two constructors.
Done.
pkg/sql/colexec/hash_aggregator_test.go, line 496 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Yeah, it'd be good to prune things a bit, but I don't have a good feeling for which cases we'd like to keep. I guess I'd remove
coldata.BatchSize()
one forchunkSize
if we're keepingcoldata.BatchSize()+1
. Maybe also keep only two options forlimits
.
Done.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 127 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
We could eliminate some bounds checks in the loops below (https://www.ardanlabs.com/blog/2018/04/bounds-check-elimination-in-go.html). Leaving a TODO for this seems good, or we can do that right away - let me know if you need some guidance.
Actually, now that I've typed this out, quickly discussing bounds checks elimination could be a good topic for our Thursday's collaboration session.
Done. Thanks for the help! This passes the linter.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 139 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Should
end
be exclusive here? Currently forascending
case we work on[start, end)
but fordescending
case on[start, end]
.
I've changed this so that the range is inclusive to make BCE easier, and noted that it's inclusive
pkg/sql/colexec/hash_aggregator_tmpl.go, line 165 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I'd explicitly say "no guarantees on input ordering" because the ordering of grouping columns is preserved by the hash aggregator, even if that ordering is arbitrary.
Done.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 167 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
In
hashAggregator.Next
maybe try something likebatch := getNext(op, true) return batch
(I'm thinking that there might be a limitation where an inlined function cannot be used in the "return context".)
Also, I think
execgen
supports verbose mode, so it might be worth enabling it locally. You can buildexecgen
binary and then attempt to generatehash_aggregator.eg.go
. You can also actually runexecgen
via the debugger and get some idea of what's going on internally.
Execgen seems to really struggle with this one. I have given up on inlining this for now.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 349 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think it'd be good to refactor
hashAggregator.Reset
method to split out the logic that we want to do here into a separate method (resetPartial
or something) that we'd call here.
Done.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
BTW I had to do something similar in this commit 1e76b8a.
Done.
pkg/sql/colexec/colexectestutils/utils.go, line 1303 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think for aggregation nulls are treated as equal, so we want to use
false
fornullsAreDistinct
:[email protected]:26257/defaultdb> create table t (a int primary key, b int); CREATE TABLE Time: 3ms total (execution 3ms / network 0ms) [email protected]:26257/defaultdb> insert into t values (1, null), (2, null); INSERT 2 Time: 6ms total (execution 6ms / network 0ms) [email protected]:26257/defaultdb> select array_agg(a) from t group by b; array_agg ------------- {1,2} (1 row)
Thanks for testing this. Fixed.
pkg/sql/colexec/colexectestutils/utils.go, line 1315 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: missing period.
Done.
pkg/sql/colexec/case_test.go, line 89 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Rather than having to plumb
nil /* orderedCols */
in a lot of callsites, it might be cleaner to keep the signature ofRunTests
unchanged but to introduceRunTestsOrderedCols
that would take inorderedCols
argument, then we'd call the new method in the aggregator tests and would refactorRunTests
to call the new method withnil
fororderedCols
.
Thank you for the suggestion! Done.
pkg/sql/colexec/external_distinct_test.go, line 194 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: why move this line?
Unintended consequence of using refactor in goland and trying to fix up new lines manually. Should be reverted now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import cycle looks like it may have resolved as part of my other changes. Will wait for TC to see if there are other failures.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rytaft, and @yuzefovich)
e192dce
to
0c8277f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I think it's almost ready to go.
Reviewed 5 of 35 files at r3, 5 of 5 files at r5, 3 of 3 files at r6, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, @rytaft, and @yuzefovich)
pkg/sql/colexec/aggregators_test.go, line 65 at r5 (raw file):
const ( Ordered ordering = iota
ditto for unexporting things.
pkg/sql/colexec/aggregators_test.go, line 160 at r5 (raw file):
tc.spec.OrderedGroupCols = tc.orderedCols } // If we have ordering on grouping columns input, then we'll require the
nit: something is off in "on grouping columns input", it reads unusual to me.
pkg/sql/colexec/external_hash_aggregator_test.go, line 38 at r5 (raw file):
) type testConfig struct {
nit: you could actually define an anonymous struct for this case since it is only used in one place with something like
for _, cfg := range []struct{
diskSpillingEnabled bool
spillForced bool
memoryLimitBytes int64
} {
{ diskSpillingEnabled: true, spillForced: true},
} {
// run test with cfg
}
pkg/sql/colexec/hash_aggregator.go, line 461 at r6 (raw file):
} func (op *hashAggregator) ResetBucketsAndTrackingState(ctx context.Context) {
nit: this could be unexported.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 127 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Done. Thanks for the help! This passes the linter.
👍
pkg/sql/colexec/hash_aggregator_tmpl.go, line 167 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Execgen seems to really struggle with this one. I have given up on inlining this for now.
Sounds good.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Done.
Hm, it's interesting that we don't need to restore the original state of the pending batch. I think it deserves a comment here.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 373 at r6 (raw file):
sel := op.bufferingState.pendingBatch.Selection() if sel != nil { for i := 0; i < op.bufferingState.pendingBatch.Length()-op.bufferingState.unprocessedIdx; i++ {
On my PR where I had to apply the same trick, the linter complained that this for
loop sliding of elements can be replaced with copy
- as it turns out copy
supports the correct copy behavior when both the destination and the source slices are the same object.
pkg/sql/colexec/colexectestutils/utils.go, line 1315 at r6 (raw file):
func (r *OpTestOutput) VerifyPartialOrder() error { distincterInput := &colexecop.FeedOperator{} distincter, distinctOutput, err := colexecbase.OrderedDistinctColsToOperators(
The import cycle is present because of the usage of colexecbase
package here.
One way to go around it is via the dependency injection:
- define an exported function in
colexectestutils
package with something like
var OrderedDistinctColsToOperators = func(
input colexecop.Operator, distinctCols []uint32, typs []*types.T, nullsAreDistinct bool,
) (colexecop.ResettableOperator, []bool, error)
- then in
colexec/inject_setup_test.go
do
colexectestutils.OrderedDistinctColsToOperators = colexecbase.OrderedDistinctColsToOperators
- then here in
VerifyPartialOrder
useOrderedDistinctColsToOperators
from the same (colexectestutils
) package.
0c8277f
to
557b63f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @michae2, @rytaft, and @yuzefovich)
pkg/sql/colexec/aggregators_test.go, line 65 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
ditto for unexporting things.
Done.
pkg/sql/colexec/aggregators_test.go, line 160 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: something is off in "on grouping columns input", it reads unusual to me.
Done.
pkg/sql/colexec/external_hash_aggregator_test.go, line 38 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: you could actually define an anonymous struct for this case since it is only used in one place with something like
for _, cfg := range []struct{ diskSpillingEnabled bool spillForced bool memoryLimitBytes int64 } { { diskSpillingEnabled: true, spillForced: true}, } { // run test with cfg }
Done.
pkg/sql/colexec/hash_aggregator.go, line 461 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: this could be unexported.
Done.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Hm, it's interesting that we don't need to restore the original state of the pending batch. I think it deserves a comment here.
Done. I didn't look in detail at how rbatch
was used in the mergejoiner implementation to see if it could benefit from this, too. Basically, by setting up the pending batch selection vector the way you suggested, we've removed all the tuples we've already emitted and don't care about, and we kept only the unprocessed tuples we still need. So we can continue operation using the modified pending batch (which will now definitely have a selection vector if it didn't already) provided we update unprocessedIdx
.
Unless there is an issue with mixing batches with and without selection vectors or using the dummy selection vector that I overlooked...it looks like the pending batch values will be copied into the buffered tuples when they're appended, though, so I think this is ok to do.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 373 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
On my PR where I had to apply the same trick, the linter complained that this
for
loop sliding of elements can be replaced withcopy
- as it turns outcopy
supports the correct copy behavior when both the destination and the source slices are the same object.
Done.
pkg/sql/colexec/colexectestutils/utils.go, line 1315 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
The import cycle is present because of the usage of
colexecbase
package here.One way to go around it is via the dependency injection:
- define an exported function in
colexectestutils
package with something likevar OrderedDistinctColsToOperators = func( input colexecop.Operator, distinctCols []uint32, typs []*types.T, nullsAreDistinct bool, ) (colexecop.ResettableOperator, []bool, error)
- then in
colexec/inject_setup_test.go
docolexectestutils.OrderedDistinctColsToOperators = colexecbase.OrderedDistinctColsToOperators
- then here in
VerifyPartialOrder
useOrderedDistinctColsToOperators
from the same (colexectestutils
) package.
Thank you! For some reason when I ran the linter locally it didn't bring up the dependency cycle, so I thought I may have gotten rid of it inadvertently. This hopefully eliminates it.
557b63f
to
35f5bc9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also run the benchmarks again and share the results so that we have the final numbers for this PR for posterity?
nit: the PR description will be included by bors into the merge commit message, so it's good practice to wrap lines at 80 characters there too.
Reviewed 10 of 10 files at r7, 4 of 4 files at r8, all commit messages.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @michae2, @rharding6373, @rytaft, and @yuzefovich)
pkg/sql/colexec/distinct_test.go, line 569 at r7 (raw file):
}, } distinctNames := []string{"unordered", "PartiallyOrdered", "ordered"}
We should keep these names unchanged so that we could compare the benchmark runs on different SHAs.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Done. I didn't look in detail at how
rbatch
was used in the mergejoiner implementation to see if it could benefit from this, too. Basically, by setting up the pending batch selection vector the way you suggested, we've removed all the tuples we've already emitted and don't care about, and we kept only the unprocessed tuples we still need. So we can continue operation using the modified pending batch (which will now definitely have a selection vector if it didn't already) provided we updateunprocessedIdx
.Unless there is an issue with mixing batches with and without selection vectors or using the dummy selection vector that I overlooked...it looks like the pending batch values will be copied into the buffered tuples when they're appended, though, so I think this is ok to do.
Yeah, this reasoning definitely makes sense to me - I originally thought that we had to restore things (since we almost always have to), but then after thinking for a bit I came to the same conclusion. Your comment looks good, so in the future we won't have to think from scratch every time :)
In case of the merge joiner, things are bit more complicated (there is a lot more internal state), so I think it'd be safer to just restore the state.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 373 at r6 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Done.
I think we should specify the length to copy - it is possible that len(sel)
is larger than pendingBatch.Length()
, so we might be copying more than necessary, so I'd use
copy(sel, sel[op.bufferingState.unprocessedIdx:op.bufferingState.pendingBatch.Length()]
Also curious - what is the purpose of _ = copy
? I think it returns the number of elements copied, but we can just ignore it, without the need for _ =
part.
pkg/sql/colexec/colexectestutils/utils.go, line 1315 at r6 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Thank you! For some reason when I ran the linter locally it didn't bring up the dependency cycle, so I thought I may have gotten rid of it inadvertently. This hopefully eliminates it.
I don't think that we have a linter that finds the import cycles - the way to see them is to attempt to compile the package that is part of the import cycle (which would also happen when running the tests).
35f5bc9
to
5f8e191
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR! (and the reminder to resize the original message...)
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2, @rharding6373, @rytaft, and @yuzefovich)
pkg/sql/colexec/distinct_test.go, line 569 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
We should keep these names unchanged so that we could compare the benchmark runs on different SHAs.
Thanks for catching this.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 360 at r2 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Yeah, this reasoning definitely makes sense to me - I originally thought that we had to restore things (since we almost always have to), but then after thinking for a bit I came to the same conclusion. Your comment looks good, so in the future we won't have to think from scratch every time :)
In case of the merge joiner, things are bit more complicated (there is a lot more internal state), so I think it'd be safer to just restore the state.
Done.
pkg/sql/colexec/hash_aggregator_tmpl.go, line 373 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think we should specify the length to copy - it is possible that
len(sel)
is larger thanpendingBatch.Length()
, so we might be copying more than necessary, so I'd usecopy(sel, sel[op.bufferingState.unprocessedIdx:op.bufferingState.pendingBatch.Length()]
Also curious - what is the purpose of
_ = copy
? I think it returns the number of elements copied, but we can just ignore it, without the need for_ =
part.
Done. For some reason I thought return values had to be explicitly ignored in go...clearly I am mistaken.
Before this change, the aggregator benchmark infrastructure used test names to determine whether to order the group column or not. This meant that some newer hash aggregator benchmarks had ordered input when they should have had unordered input to be more representative of a real workload. This also limited development as we add support for partial ordering in the hash aggregator. This change adds an order to the aggregation benchmark types `aggType`, which can be `Unordered`, `Ordered`, or `Partial`. It also adds a new aggregator to `aggTypes`, which is the hash aggregator with the `Partial` ordering. In the benchmark setup, the behavior for the old hash and ordered aggregators is the same, but when the ordering is `Partial`, the benchmark adds an extra group column, orders the first column, and randomizes the second column. This change also introduces a new benchmark for partial ordering, `BenchmarkHashAggregatorPartialOrder`. This tests the hash aggregator in both `Partial` and `Unordered` modes with 2 grouping columns. The benchmark introduces chunkSizes and limits as benchmark knobs. ChunkSize is the number of tuples that should be in each distinct ordered group when the order is `Partial`. Limit is the number of tuples that should be returned by the aggregator function before the benchmark ends. Release note: None
Previously, the hash aggregator assumed that all group columns were unordered. However, if some but not all group columns are ordered, we can take advantage of this and emit aggregated groups as they complete. This allows us to clean up the aggregation buckets, reducing the hash aggregator's memory footprint and emit rows earlier. We support partial ordering of group columns by chunking the input data by the ordered columns. We search the last batch in the buffer for a distinct group boundary. If found, we only aggregate tuples up to that point, then emit the groups. If not found, we process the tuples to clear the buffer, then search for the group boundary. If the end of the group is found (the aggregation stage may be performed multiple times), then the remaining tuples in the group are processed and emitted. In order to support spilling to disk, whenever we emit tuples, we reset the input tracking buffer and re-add any of the remaining tuples in the pending batch to it. To illustrate with an example, let's say we have a batch size of 2, a buffer size of 4, and the following input grouping columns, where the first column is ordered, and the second is not: [1, 1], [1, 3], [1, 1], [2, 2], [2, 3], [2, 3], [2, 1], [2, 2], [3, 1] We retrieve the pending batch {[1, 1], [1, 3]} and append it to the input tracker and the input buffer. The buffer is not yet full, so we retrieve the next pending batch {[1, 1], [2, 2]}. This batch has a distinct group at [2, 2], so only [1, 1] is added to the input buffer, so it now contains {[1, 1], [1, 3], [1, 1]}. We aggregate the input buffer, emit the tuples, and reset the input tracker. [2, 2] is now cleared from the input tracker, so we need to append it again. We resume the buffering stage of the hash aggregator. Since the last group was not split, we append [2, 2] to the now empty input buffer. We retrieve the next 2 batches, {[2, 3], [2, 3]} and {[2, 1], [2, 2]}, The input buffer is full after appending [2, 1], but no group boundary was found, so we mark this as a split group and aggregate the tuples in the input buffer. Since the group is not complete, we do not emit the tuples yet. On the next pass in the buffering stage, we append [2, 2] to the input buffer, but still have a split group since no distinct group was found. We fetch the next batch, {[3, 1]}. This is a distinct group, so we trigger the aggregation and outputing stages on the input buffer of {[2, 2]}. Finally, we buffer [3, 1] and perform aggregation and output the last tuple. We use a template to separate the changes for the partial order case from the original unordered case, since chunking the input and finding distinct group boundaries incurs some overhead. Release note: None
5f8e191
to
fcf757e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the gist in the merge commit message with rerun benchmark results: https://gist.github.com/rharding6373/09eface5ed49a00aac540a8ea8304662
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2, @rharding6373, @rytaft, and @yuzefovich)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Reviewed 4 of 4 files at r9, 3 of 3 files at r10, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @michae2, @rharding6373, @rytaft, and @yuzefovich)
bors r+ |
Thanks for the review! |
Build succeeded: |
This change consists of two commits. The first adds test coverage for
aggregation with partial ordering of group columns, including adding a new
microbenchmark. The second adds support for partial ordering in the hash
aggregator.
Results from both the new benchmark, HashAggregatorPartialOrder, and existing
hash aggregator benchmarks show improved performance for the partial order
case for lower limit values, and no performance degradation for the unordered
case:
https://gist.github.com/rharding6373/09eface5ed49a00aac540a8ea8304662
This change is fairly large, so I will follow this PR with an update to the optimizer
cost model to take advantage of partial ordering.