Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: adds support for partial ordering in topk sorter #69905

Merged
merged 2 commits into from
Sep 21, 2021

Conversation

rharding6373
Copy link
Collaborator

Previously, topKSorter had to process all input rows before returning
the top K rows according to its specified ordering. If a subset of the
input rows were already ordered, topKSorter would still iterate over the
entire input.

However, if the input was partially ordered, topKSorter could
potentially stop iterating early, since after it has found K candidates
it is guaranteed not to find any better top candidates.

For example, take the following query and table with an index on a:

  a | b
----+----
  1 | 5
  2 | 3
  2 | 1
  3 | 3
  5 | 3

SELECT * FROM t ORDER BY a, b LIMIT 2

Given an index scan on a to provide a's ordering, topk only needs to
process 3 rows in order to guarantee that it has found the top K rows.
Once it finishes processing the third row [2, 1], all subsequent rows
have higher values of a than the top 2 rows found so far, and
therefore cannot be in the top 2 rows.

This change modifies the vectorized engine's TopKSorter signature to include
a partial ordering. The TopKSorter chunks the input according to the
sorted columns and processes each chunk with its existing heap
algorithm. At the end of each chunk, if K rows are in the heap,
TopKSorter emits the rows and stops execution.

A later commit, once merged with top K optimizer and distsql changes, will adjust the cost model for top K to reflect this change.

Release note: N/A

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/sorttopk.go, line 192 at r1 (raw file):

	// t.topK.
	t.inputBatch = t.Input.Next()
	t.distincterInput.SetBatch(t.inputBatch)

Could use some feedback on what other considerations need to be made (e.g., hash aggregator has to handle spill to disk) that I've overlooked.


pkg/sql/colexec/colbuilder/execplan.go, line 374 at r1 (raw file):

	spoolMemLimit := totalMemLimit * 4 / 5
	maxOutputBatchMemSize := totalMemLimit - spoolMemLimit
	if post.Limit != 0 && post.Limit < math.MaxUint64-post.Offset {

Moved the top k up in the conditional order to allow for testing this feature, though since I didn't see any logictest output change I'm not sure if this was right or if test coverage is poor. I'm not sure if it should be merged with this change, but it shouldn't matter after the optimizer and distsql top k changes are merged and this change is reflected in the optimizer's top k cost model.

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from eb44669 to a58f00e Compare September 8, 2021 17:01
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This look good! I have some suggestions to make it even better :)

Reviewed 5 of 6 files at r1, 1 of 1 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373)


pkg/sql/colexec/external_sort.go, line 273 at r1 (raw file):

	var err error
	if topK > 0 {
		inMemSorter, err = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, 0 /* matchLen */, topK, inMemSortOutputLimit)

Here we should use specified matchLen that we've just added the plumbing for.


pkg/sql/colexec/sort_test.go, line 200 at r1 (raw file):

	ordCols = generateColumnOrdering(rng, nCols, nOrderingCols)
	partialOrdCols := make([]execinfrapb.Ordering_Column, nPartialOrderingCols)
	copy(partialOrdCols, ordCols[0:nPartialOrderingCols])

nit: you can drop 0, i.e. ordCols[:nPartialOrderingCols] is the same.

Also, do we need a deep copy? I feel like partialOrdCols := ordCols[:nPartialOrderingCols] should just work.


pkg/sql/colexec/sort_test.go, line 327 at r1 (raw file):

						var err error
						if topK {
							// TODO(harding): Randomize partial ordering columns, too.

As I mentioned in slack, I'd probably create a separate method for benchmarking the top K sort with partial ordering because BenchmarkSort is already quite complex and we probably want to use a different source.


Update: the TODO can safely be removed.


pkg/sql/colexec/sorttopk.go, line 16 at r1 (raw file):

	"container/heap"
	"context"
	"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"

nit: this line is out of place. Do you have crlfmt enabled (https://cockroachlabs.atlassian.net/wiki/spaces/ENG/pages/154206209/Goland+Tips+and+Tricks#GolandTipsandTricks-EnablecrlfmtWatcher)?


pkg/sql/colexec/sorttopk.go, line 192 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Could use some feedback on what other considerations need to be made (e.g., hash aggregator has to handle spill to disk) that I've overlooked.

I think here we are already covered in terms of disk spilling - t.firstUnprocessedTupleIdx correctly tracks the boundary of which tuples were moved into t.topK batch and which stayed within t.inputBatch.

Crucially, we currently don't have a case where we emit any output until we either exhaust the input source or find the boundary of a new group after K tuples have already been buffered. In the hash aggregator case we want to intertwine emitting of the output and processing the incoming data which makes the disk spilling a bit more tricky.


pkg/sql/colexec/sorttopk.go, line 42 at r2 (raw file):

	inputTypes []*types.T,
	orderingCols []execinfrapb.Ordering_Column,
	matchLen int,

super nit: in some places we use matchLen for this, in others nPartialOrderingCols. Let's pick one and use it everywhere, I personally don't have a preference.


pkg/sql/colexec/sorttopk.go, line 60 at r2 (raw file):

	var err error
	base.distincterInput = &colexecop.FeedOperator{}
	base.distincter, base.distinctOutput, err = colexecbase.OrderedDistinctColsToOperators(

OrderedDistinctColsToOperators currently has a non-trivial cost when matchLen is 0. Namely, we will create fnOp that zeroes out base.distinctOutput for each batch coming from the input. I think we either need to remove that cost (by adjusting OrderedDistinctColsToOperators for the special case of empty distinctCols argument) or to call it only if matchLen > 0.


pkg/sql/colexec/sorttopk.go, line 134 at r2 (raw file):

	t.Input.Init(t.Ctx)
	t.topK = colexecutils.NewAppendOnlyBufferedBatch(t.allocator, t.inputTypes, nil /* colsToStore */)
	t.comparators = make([]vecComparator, len(t.inputTypes))

A side comment: we don't actually need to create a vecComparator for each input vector - only for those in orderingCols. Could you please leave a TODO about this?


pkg/sql/colexec/sorttopk.go, line 186 at r2 (raw file):

// in sorted order.
func (t *topKSorter) spool() {
	t.distincter.Init(t.Ctx)

nit: this should be moved into topKSorter.Init.


pkg/sql/colexec/sorttopk.go, line 187 at r2 (raw file):

func (t *topKSorter) spool() {
	t.distincter.Init(t.Ctx)
	t.distincter.(colexecop.Resetter).Reset(t.Ctx)

nit: this should be moved into topKSorter.Reset.


pkg/sql/colexec/sorttopk.go, line 189 at r2 (raw file):

	t.distincter.(colexecop.Resetter).Reset(t.Ctx)
	// Fill up t.topK by spooling up to K rows from the input.
	// We don't need to check for distinct groups until after we have filled

This assumption makes sense to me in the current implementation.

However, we could actually take advantage of the partial ordering even during buffering first K rows if new groups are found. For example, if we have K = 5, batch = {(0, 1), (0, 0), (1, 1), (1, 2)}, we can process first two rows separately and emit them, then lower K to 3 and process the remaining rows. This would allow us to return some rows sooner as well as lower the memory footprint of topK batch. I imagine that it will be a bit ugly to implement this and might not offer the perf gains to justify the complexity, so I'd just leave a TODO.


pkg/sql/colexec/sorttopk.go, line 191 at r2 (raw file):

	// We don't need to check for distinct groups until after we have filled
	// t.topK.
	t.inputBatch = t.Input.Next()

nit: these three lines are repeated several times, it's probably cleaner to extract them into a helper method. (Maybe also include t.firstUnprocessedTupleIdx = 0 in there.)

You can also add //gcassert:inline as the comment on that method to make sure it is inlined (i.e. there is no perf cost to having a function call). (However, I'm not certain that the method will be inlined, and it is ok if it isn't.)


pkg/sql/colexec/sorttopk.go, line 241 at r2 (raw file):

					// If this is a distinct group, we have already found the top K input,
					// so we can stop comparing the rest of this and subsequent batches.
					if t.distinctOutput[idx] {

I think we should first check whether matchLen is positive, otherwise this will always be false.


pkg/sql/colexec/sorttopk.go, line 307 at r2 (raw file):

func (t *topKSorter) compareRow(vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 int) int {
	for i := range t.orderingCols {

Hm, I think we can optimize this method in order to skip some comparisons altogether or at least skip comparing on the first matchLen columns.

My idea is that alongside with topK vector we will be storing groupIdx := make([]int, t.k) where groupIdx[i] stores the "group number" of the tuple that is at position i within topK vector. Whenever we see t.distinctOutput[idx] == true, we increment the group counter. Then if groupIdx[i] < groupIdx[j], we know that row i is smaller than j; if groupIdx[i] == groupIdx[j], we can skip the comparison on the first matchLen columns because they are identical.

Some care will need to be taken because we have a heap on top of topK vector, but I think we can make this work, and it'll be a nice perf improvement.


pkg/sql/colexec/sorttopk_test.go, line 96 at r2 (raw file):

}

func TestTopKSorter(t *testing.T) {

One idea of a randomized test that might be worth implementing is using the general sort as the oracle and the top K sort as the operator being tested.

We can generate some random input data set, then vary the number of partial ordering columns for top K, sort the data set according to those partial ordering columns, run the general sort to get the expected result set, then vary value of K, take the prefix and use RunTests test harness to check the top K sorter.


pkg/sql/colexec/sorttopk_test.go, line 107 at r2 (raw file):

	}
}
func BenchmarkSortTopK(b *testing.B) {

One idea on how to see the improvement of your change is to introduce this benchmark in a separate first commit and then use ./scripts/bench to see the perf change of the second commit that contains the optimization for the partial ordering case.


pkg/sql/colexec/sorttopk_test.go, line 127 at r2 (raw file):

								// batch) * nCols (number of columns / row).
								b.SetBytes(int64(8 * nBatches * coldata.BatchSize() * nCols))
								typs := make([]*types.T, nCols)

It'd be good to extract most of the logic here into a helper method to share with BenchmarkSortChunks.


pkg/sql/colexec/colbuilder/execplan.go, line 374 at r1 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Moved the top k up in the conditional order to allow for testing this feature, though since I didn't see any logictest output change I'm not sure if this was right or if test coverage is poor. I'm not sure if it should be merged with this change, but it shouldn't matter after the optimizer and distsql top k changes are merged and this change is reflected in the optimizer's top k cost model.

Changing the order of conditionals sounds good to me.

My guess is that we currently don't have any logic tests that use EXPLAIN and that have this top K with partial ordering, it'd be good to add some.

It's probably worth rebasing this commit on top of your other work on top K sort right away.

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch 3 times, most recently from dbde0d5 to 8bd7750 Compare September 10, 2021 01:09
Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I ran some microbenchmarks, but although for partially-ordered data we see up to -98% reduction in runtime, for non-ordered data it's up ~+6% consistently. May need to experiment with the implementation to try to bring the overhead down. I'm not sure how much overhead we would normally tolerate with a feature like this.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/colexec/external_sort.go, line 273 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Here we should use specified matchLen that we've just added the plumbing for.

Done.


pkg/sql/colexec/sort_test.go, line 327 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

As I mentioned in slack, I'd probably create a separate method for benchmarking the top K sort with partial ordering because BenchmarkSort is already quite complex and we probably want to use a different source.


Update: the TODO can safely be removed.

Done.


pkg/sql/colexec/sorttopk.go, line 16 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this line is out of place. Do you have crlfmt enabled (https://cockroachlabs.atlassian.net/wiki/spaces/ENG/pages/154206209/Goland+Tips+and+Tricks#GolandTipsandTricks-EnablecrlfmtWatcher)?

Hm, I had it configured, but not enabled on save. Looks like it's working now. Thanks!


pkg/sql/colexec/sorttopk.go, line 192 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think here we are already covered in terms of disk spilling - t.firstUnprocessedTupleIdx correctly tracks the boundary of which tuples were moved into t.topK batch and which stayed within t.inputBatch.

Crucially, we currently don't have a case where we emit any output until we either exhaust the input source or find the boundary of a new group after K tuples have already been buffered. In the hash aggregator case we want to intertwine emitting of the output and processing the incoming data which makes the disk spilling a bit more tricky.

Thanks for the feedback.


pkg/sql/colexec/sorttopk.go, line 42 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

super nit: in some places we use matchLen for this, in others nPartialOrderingCols. Let's pick one and use it everywhere, I personally don't have a preference.

Picked matchLen.


pkg/sql/colexec/sorttopk.go, line 60 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

OrderedDistinctColsToOperators currently has a non-trivial cost when matchLen is 0. Namely, we will create fnOp that zeroes out base.distinctOutput for each batch coming from the input. I think we either need to remove that cost (by adjusting OrderedDistinctColsToOperators for the special case of empty distinctCols argument) or to call it only if matchLen > 0.

I added a conditional everywhere we use the distincter. Thanks for the explanation.


pkg/sql/colexec/sorttopk.go, line 134 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

A side comment: we don't actually need to create a vecComparator for each input vector - only for those in orderingCols. Could you please leave a TODO about this?

Done.


pkg/sql/colexec/sorttopk.go, line 186 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this should be moved into topKSorter.Init.

Done.


pkg/sql/colexec/sorttopk.go, line 187 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this should be moved into topKSorter.Reset.

Done.


pkg/sql/colexec/sorttopk.go, line 189 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

This assumption makes sense to me in the current implementation.

However, we could actually take advantage of the partial ordering even during buffering first K rows if new groups are found. For example, if we have K = 5, batch = {(0, 1), (0, 0), (1, 1), (1, 2)}, we can process first two rows separately and emit them, then lower K to 3 and process the remaining rows. This would allow us to return some rows sooner as well as lower the memory footprint of topK batch. I imagine that it will be a bit ugly to implement this and might not offer the perf gains to justify the complexity, so I'd just leave a TODO.

That would be interesting. Added a TODO.


pkg/sql/colexec/sorttopk.go, line 191 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: these three lines are repeated several times, it's probably cleaner to extract them into a helper method. (Maybe also include t.firstUnprocessedTupleIdx = 0 in there.)

You can also add //gcassert:inline as the comment on that method to make sure it is inlined (i.e. there is no perf cost to having a function call). (However, I'm not certain that the method will be inlined, and it is ok if it isn't.)

Done.


pkg/sql/colexec/sorttopk.go, line 241 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we should first check whether matchLen is positive, otherwise this will always be false.

Added to NewTopKSorter


pkg/sql/colexec/sorttopk.go, line 307 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, I think we can optimize this method in order to skip some comparisons altogether or at least skip comparing on the first matchLen columns.

My idea is that alongside with topK vector we will be storing groupIdx := make([]int, t.k) where groupIdx[i] stores the "group number" of the tuple that is at position i within topK vector. Whenever we see t.distinctOutput[idx] == true, we increment the group counter. Then if groupIdx[i] < groupIdx[j], we know that row i is smaller than j; if groupIdx[i] == groupIdx[j], we can skip the comparison on the first matchLen columns because they are identical.

Some care will need to be taken because we have a heap on top of topK vector, but I think we can make this work, and it'll be a nice perf improvement.

Done.


pkg/sql/colexec/sorttopk_test.go, line 96 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

One idea of a randomized test that might be worth implementing is using the general sort as the oracle and the top K sort as the operator being tested.

We can generate some random input data set, then vary the number of partial ordering columns for top K, sort the data set according to those partial ordering columns, run the general sort to get the expected result set, then vary value of K, take the prefix and use RunTests test harness to check the top K sorter.

Added a test to this effect, but after implementing it I don't think that it's adding value beyond what TestSortRandomized already does by using slice sorting for expected output in generateRandomDataForTestSort. Was there something else you had in mind? PTAL.


pkg/sql/colexec/sorttopk_test.go, line 107 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

One idea on how to see the improvement of your change is to introduce this benchmark in a separate first commit and then use ./scripts/bench to see the perf change of the second commit that contains the optimization for the partial ordering case.

That's a good idea...let me split this into two commits.


pkg/sql/colexec/sorttopk_test.go, line 127 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

It'd be good to extract most of the logic here into a helper method to share with BenchmarkSortChunks.

Done.


pkg/sql/colexec/colbuilder/execplan.go, line 374 at r1 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Changing the order of conditionals sounds good to me.

My guess is that we currently don't have any logic tests that use EXPLAIN and that have this top K with partial ordering, it'd be good to add some.

It's probably worth rebasing this commit on top of your other work on top K sort right away.

Rebased the commit and added a new logic test to limit.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to bring the perf hit to almost zero since we do have tools to help with this. My guess is that most of the perf hit comes from the additional branches we introduced with if t.hasPartialOrder ....


One simple idea might be extracting a local variable to capture the value of t.hasPartialOrder, i.e. something like

func (t *topKSorter) Next() coldata.Batch {
  hasPartialOrder := t.hasPartialOrder
  ...
  if hasPartialOrder ... {
    ...
  }
  ...

My thinking that if hasPartialOrder is a local variable that is never modified once set, the Go compiler might generate different machine code that will make it so that CPU branch prediction would be more likely to be correct. This is just a speculation, so it might not have any merit.


But what I think should work is templating out the top K sorter to have two different structs topKSorter and topKSorterWithPartialOrder, and only the latter would have the partial ordering logic in its methods and without having to evaluate if t.hasPartialOrder. In NewTopKSorter we do know whether we have the partial ordering or not, so we really need to evaluate if t.hasPartialOrder condition only once, in the constructor.

Unfortunately, I don't think we have good (read any) documentation about the templating, but I think you could draw inspiration from row_number_tmpl.go and #37282 which converted one operator into the templated form (note that that PR was merged long time ago, so the files might have been moved/renamed, but the general ideas should be the same).

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @yuzefovich)


pkg/sql/colexec/sorttopk.go, line 121 at r6 (raw file):

	sel []int
	// group stores the group number associated with each entry in topK.
	group []int

We can allocate group precisely in the constructor.


pkg/sql/colexec/sorttopk.go, line 361 at r6 (raw file):

			}
			// The tuples are in different groups, so we don't need to compare actual
			// columns. We use the direction of the first column to determine the

Hm, this sentence makes me wonder whether we currently have a correctness issue because of the different directions of columns. Imagine the following test case:

CREATE TABLE t (c1 INT, c2 INT, c3 INT);
INSERT INTO t VALUES (0, 0, 0), (0, 1, 1);

and for the query

SELECT * FROM ORDER BY c1 ASC, c2 DESC, c3 ASC;

we already have the partial ordering on c1, c2. In this case the first row has a smaller group id, yet it differs on the second column which has a different direction from the first column (which has equal values). I think we currently will incorrectly return the result in such a scenario.

Am I missing something?

If I'm correct, we might be able to take the full advantage of this group id optimization only if all partially ordered columns have the same direction; if not, then maybe it is correct only to skip first matchLen columns in case the group ids are the same because these group ids only capture whether two rows differ on the partially ordered columns, but not on which columns.


pkg/sql/colexec/sorttopk_test.go, line 96 at r2 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Added a test to this effect, but after implementing it I don't think that it's adding value beyond what TestSortRandomized already does by using slice sorting for expected output in generateRandomDataForTestSort. Was there something else you had in mind? PTAL.

Hm, I was mostly concerned about adding the "group id" optimization (especially during heap.Init), so I wanted to increase the test coverage of the top K sort with partial ordering.


pkg/sql/colexec/sorttopk_test.go, line 127 at r5 (raw file):

		typs[i] = types.Int
	}
	for nCols := 1; nCols < maxCols; nCols++ {

nit: maybe nCols <= maxCols?


pkg/sql/colexec/sorttopk_test.go, line 141 at r5 (raw file):

			expectedOut := oracle.Next()

			// Test values for k need to be monotonically increasing, so we can reuse

To simplify this a bit we could populated expected with all nTups tuples and then use expected[:k] when testing the top K.


pkg/sql/colexec/sorttopk_test.go, line 153 at r5 (raw file):

				}

				for matchLen := 1; matchLen < nOrderingCols; matchLen++ {

I think we should pull this loop out so that we pass matchLen to generateRandomDataForTestSort. Then we will avoid the need to the inputSorter.


pkg/sql/colexec/sorttopk_test.go, line 171 at r5 (raw file):

						}
					}
					colexectestutils.AssertTuplesOrderedEqual(expected, actual, evalCtx)

I think it'd be good to actually use colexectestutils.RunTests test harness since it randomizes the batch sizes and the selection vector.

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from 8bd7750 to da0d73b Compare September 10, 2021 18:45
Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I addressed the other comments so that the code is in a better state before attempting to templatize topk. I suspect migrating topKSorter to a template is going to take more than a day, so this is going to be an ongoing flex project.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @yuzefovich)


pkg/sql/colexec/sorttopk.go, line 121 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We can allocate group precisely in the constructor.

Done.


pkg/sql/colexec/sorttopk.go, line 361 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, this sentence makes me wonder whether we currently have a correctness issue because of the different directions of columns. Imagine the following test case:

CREATE TABLE t (c1 INT, c2 INT, c3 INT);
INSERT INTO t VALUES (0, 0, 0), (0, 1, 1);

and for the query

SELECT * FROM ORDER BY c1 ASC, c2 DESC, c3 ASC;

we already have the partial ordering on c1, c2. In this case the first row has a smaller group id, yet it differs on the second column which has a different direction from the first column (which has equal values). I think we currently will incorrectly return the result in such a scenario.

Am I missing something?

If I'm correct, we might be able to take the full advantage of this group id optimization only if all partially ordered columns have the same direction; if not, then maybe it is correct only to skip first matchLen columns in case the group ids are the same because these group ids only capture whether two rows differ on the partially ordered columns, but not on which columns.

I reworked the "partial order multi col" test case to cover this and you're right, this isn't a correct implementation. I removed the bit for different groups and added a TODO for further optimization.


pkg/sql/colexec/sorttopk.go, line 378 at r6 (raw file):

Quoted 381 lines of code…
// NewTopKSorter returns a new sort operator, which sorts its input on the
// columns given in orderingCols and returns the first K rows. The inputTypes
// must correspond 1-1 with the columns in the input operator. If matchLen is
// non-zero, then the input tuples must be sorted on first matchLen columns.
func NewTopKSorter(
	allocator *colmem.Allocator,
	input colexecop.Operator,
	inputTypes []*types.T,
	orderingCols []execinfrapb.Ordering_Column,
	matchLen int,
	k uint64,
	maxOutputBatchMemSize int64,
) (colexecop.ResettableOperator, error) {
	if matchLen < 0 {
		return nil, errors.AssertionFailedf("invalid matchLen %v", matchLen)
	}
	base := &topKSorter{
		allocator:             allocator,
		OneInputNode:          colexecop.NewOneInputNode(input),
		inputTypes:            inputTypes,
		orderingCols:          orderingCols,
		k:                     k,
		hasPartialOrder:       matchLen > 0,
		matchLen:              matchLen,
		maxOutputBatchMemSize: maxOutputBatchMemSize,
	}
	if base.hasPartialOrder {
		partialOrderCols := make([]uint32, matchLen)
		for i := range partialOrderCols {
			partialOrderCols[i] = orderingCols[i].ColIdx
		}
		var err error
		base.distincterInput = &colexecop.FeedOperator{}
		base.distincter, base.distinctOutput, err = colexecbase.OrderedDistinctColsToOperators(
			base.distincterInput, partialOrderCols, inputTypes, false, /* nullsAreDistinct */
		)
		if err != nil {
			return base, err
		}
	}
	return base, nil
}
var _ colexecop.BufferingInMemoryOperator = &topKSorter{}
var _ colexecop.Resetter = &topKSorter{}
// topKSortState represents the state of the sort operator.
type topKSortState int
const (
	// topKSortSpooling is the initial state of the operator, where it spools
	// its input.
	topKSortSpooling topKSortState = iota
	// topKSortEmitting is the second state of the operator, indicating that
	// each call to Next will return another batch of the sorted data.
	topKSortEmitting
	// topKSortDone is the final state of the operator, where it always returns
	// a zero batch.
	topKSortDone
)
type topKSorter struct {
	colexecop.OneInputNode
	colexecop.InitHelper
	allocator       *colmem.Allocator
	orderingCols    []execinfrapb.Ordering_Column
	inputTypes      []*types.T
	k               uint64
	matchLen        int
	hasPartialOrder bool
	// state is the current state of the sort.
	state topKSortState
	// inputBatch is the last read batch from the input.
	inputBatch coldata.Batch
	// firstUnprocessedTupleIdx indicates the index of the first tuple in
	// inputBatch that hasn't been processed yet.
	firstUnprocessedTupleIdx int
	// comparators stores one comparator per ordering column.
	comparators []vecComparator
	// topK stores the top K rows. It is not sorted internally.
	topK *colexecutils.AppendOnlyBufferedBatch
	// heap is a max heap which stores indices into topK.
	heap []int
	// sel is a selection vector which specifies an ordering on topK.
	sel []int
	// group stores the group number associated with each entry in topK.
	group []int
	// emitted is the count of rows which have been emitted so far.
	emitted               int
	output                coldata.Batch
	maxOutputBatchMemSize int64
	// distincter is an operator that groups an input batch by its partially
	// ordered column values.
	distincterInput *colexecop.FeedOperator
	distincter      colexecop.Operator
	distinctOutput  []bool
	exportedFromTopK  int
	exportedFromBatch int
	windowedBatch     coldata.Batch
}
func (t *topKSorter) Init(ctx context.Context) {
	if !t.InitHelper.Init(ctx) {
		return
	}
	t.Input.Init(t.Ctx)
	t.topK = colexecutils.NewAppendOnlyBufferedBatch(t.allocator, t.inputTypes, nil /* colsToStore */)
	// TODO(harding): We only need to create a vecComparator for the input vectors in orderingCols.
	t.comparators = make([]vecComparator, len(t.inputTypes))
	for i, typ := range t.inputTypes {
		t.comparators[i] = GetVecComparator(typ, 2)
	}
	// TODO(yuzefovich): switch to calling this method on allocator. This will
	// require plumbing unlimited allocator to work correctly in tests with
	// memory limit of 1.
	t.windowedBatch = coldata.NewMemBatchNoCols(t.inputTypes, coldata.BatchSize())
	if t.hasPartialOrder {
		t.distincter.Init(t.Ctx)
	}
}
func (t *topKSorter) Next() coldata.Batch {
	for {
		switch t.state {
		case topKSortSpooling:
			t.spool()
			t.state = topKSortEmitting
		case topKSortEmitting:
			output := t.emit()
			if output.Length() == 0 {
				t.state = topKSortDone
				continue
			}
			return output
		case topKSortDone:
			return coldata.ZeroBatch
		default:
			colexecerror.InternalError(errors.AssertionFailedf("invalid sort state %v", t.state))
			// This code is unreachable, but the compiler cannot infer that.
			return nil
		}
	}
}
func (t *topKSorter) Reset(ctx context.Context) {
	if r, ok := t.Input.(colexecop.Resetter); ok {
		r.Reset(ctx)
	}
	t.state = topKSortSpooling
	t.firstUnprocessedTupleIdx = 0
	t.topK.ResetInternalBatch()
	t.emitted = 0
	if t.hasPartialOrder {
		t.distincter.(colexecop.Resetter).Reset(t.Ctx)
	}
}
//gcassert:inline
func (t *topKSorter) nextBatch() {
	t.inputBatch = t.Input.Next()
	if t.hasPartialOrder {
		t.distincterInput.SetBatch(t.inputBatch)
		t.distincter.Next()
	}
	t.firstUnprocessedTupleIdx = 0
}
// spool reads in the entire input, always storing the top K rows it has seen so
// far in o.topK. This is done by maintaining a max heap of indices into o.topK.
// Whenever we encounter a row which is smaller than the max row in the heap,
// we replace the max with that row.
//
// After all the input has been read, we pop everything off the heap to
// determine the final output ordering. This is used in emit() to output the rows
// in sorted order.
func (t *topKSorter) spool() {
	// Fill up t.topK by spooling up to K rows from the input.
	// We don't need to check for distinct groups until after we have filled
	// t.topK.
	// TODO(harding): We could emit the first N < K rows if the N rows are in one
	// or more distinct and complete groups, and then use a K-N size heap to find
	// the remaining top K-N rows.
	t.nextBatch()
	remainingRows := t.k
	groupId := 0
	for remainingRows > 0 && t.inputBatch.Length() > 0 {
		fromLength := t.inputBatch.Length()
		if remainingRows < uint64(t.inputBatch.Length()) {
			// t.topK will be full after this batch.
			fromLength = int(remainingRows)
		}
		t.firstUnprocessedTupleIdx = fromLength
		t.topK.AppendTuples(t.inputBatch, 0 /* startIdx */, fromLength)
		remainingRows -= uint64(fromLength)
		// Find the group id for each tuple just added to topK.
		if t.hasPartialOrder {
			sel := t.inputBatch.Selection()
			for i := 0; i < fromLength; i++ {
				idx := i
				if sel != nil {
					idx = sel[i]
				}
				if t.distinctOutput[idx] {
					groupId++
				}
				t.group = append(t.group, groupId)
			}
		}
		if fromLength == t.inputBatch.Length() {
			t.nextBatch()
		}
	}
	t.updateComparators(topKVecIdx, t.topK)
	// Initialize the heap.
	if cap(t.heap) < t.topK.Length() {
		t.heap = make([]int, t.topK.Length())
	} else {
		t.heap = t.heap[:t.topK.Length()]
	}
	for i := range t.heap {
		t.heap[i] = i
	}
	heap.Init(t)
	// Read the remainder of the input. Whenever a row is less than the heap max,
	// swap it in. When we find the end of the group, we can finish reading the
	// input.
	groupDone := false
	for t.inputBatch.Length() > 0 {
		t.updateComparators(inputVecIdx, t.inputBatch)
		sel := t.inputBatch.Selection()
		t.allocator.PerformOperation(
			t.topK.ColVecs(),
			func() {
				for i := t.firstUnprocessedTupleIdx; i < t.inputBatch.Length(); i++ {
					idx := i
					if sel != nil {
						idx = sel[i]
					}
					// If this is a distinct group, we have already found the top K input,
					// so we can stop comparing the rest of this and subsequent batches.
					if t.hasPartialOrder && t.distinctOutput[idx] {
						groupDone = true
						return
					}
					maxIdx := t.heap[0]
					groupMaxIdx := 0
					if t.hasPartialOrder {
						groupMaxIdx = t.group[maxIdx]
					}
					if t.compareRow(inputVecIdx, topKVecIdx, idx, maxIdx, groupId, groupMaxIdx) < 0 {
						for j := range t.inputTypes {
							t.comparators[j].set(inputVecIdx, topKVecIdx, idx, maxIdx)
						}
						if t.hasPartialOrder {
							t.group[maxIdx] = groupId
						}
						heap.Fix(t, 0)
					}
				}
				t.firstUnprocessedTupleIdx = t.inputBatch.Length()
			},
		)
		if groupDone {
			break
		}
		t.nextBatch()
	}
	// t.topK now contains the top K rows unsorted. Create a selection vector
	// which specifies the rows in sorted order by popping everything off the
	// heap. Note that it's a max heap so we need to fill the selection vector in
	// reverse.
	t.sel = make([]int, t.topK.Length())
	for i := 0; i < t.topK.Length(); i++ {
		t.sel[len(t.sel)-i-1] = heap.Pop(t).(int)
	}
}
func (t *topKSorter) emit() coldata.Batch {
	toEmit := t.topK.Length() - t.emitted
	if toEmit == 0 {
		// We're done.
		return coldata.ZeroBatch
	}
	t.output, _ = t.allocator.ResetMaybeReallocate(t.inputTypes, t.output, toEmit, t.maxOutputBatchMemSize)
	if toEmit > t.output.Capacity() {
		toEmit = t.output.Capacity()
	}
	for i := range t.inputTypes {
		vec := t.output.ColVec(i)
		// At this point, we have already fully sorted the input. It is ok to do
		// this Copy outside of the allocator - the work has been done, but
		// theoretically it is possible to hit the limit here (mainly with
		// variable-sized types like Bytes). Nonetheless, for performance reasons
		// it would be sad to fallback to disk at this point.
		vec.Copy(
			coldata.SliceArgs{
				Src:         t.topK.ColVec(i),
				Sel:         t.sel,
				SrcStartIdx: t.emitted,
				SrcEndIdx:   t.emitted + toEmit,
			},
		)
	}
	t.output.SetLength(toEmit)
	t.emitted += toEmit
	return t.output
}
func (t *topKSorter) compareRow(
	vecIdx1, vecIdx2 int, rowIdx1, rowIdx2 int, groupIdx1, groupIdx2 int,
) int {
	for i := range t.orderingCols {
		res := 0
		info := t.orderingCols[i]
		if t.hasPartialOrder && i < t.matchLen {
			// If the tuples being compared are in the same group, we only need to
			// compare the columns that are not already ordered.
			if t.group[groupIdx1] == t.group[groupIdx2] {
				continue
			}
			// The tuples are in different groups, so we don't need to compare actual
			// columns. We use the direction of the first column to determine the
			// ultimate comparison result.
			if t.group[groupIdx1] < t.group[groupIdx2] {
				res = 1
			} else {
				res = -1
			}
		} else {
			res = t.comparators[info.ColIdx].compare(vecIdx1, vecIdx2, rowIdx1, rowIdx2)
		}
		if res != 0 {
			switch d := info.Direction; d {
			case execinfrapb.Ordering_Column_ASC:
				return res
			case execinfrapb.Ordering_Column_DESC:
				return -res
			default:
				colexecerror.InternalError(errors.AssertionFailedf("unexpected direction value %d", d))
			}
		}
	}
	return 0
}
func (t *topKSorter) updateComparators(vecIdx int, batch coldata.Batch) {
	for i := range t.inputTypes {
		t.comparators[i].setVec(vecIdx, batch.ColVec(i))
	}
}
func (t *topKSorter) ExportBuffered(colexecop.Operator) coldata.Batch {
	topKLen := t.topK.Length()
	// First, we check whether we have exported all tuples from the topK vector.
	if t.exportedFromTopK < topKLen {
		newExportedFromTopK := t.exportedFromTopK + coldata.BatchSize()
		if newExportedFromTopK > topKLen {
			newExportedFromTopK = topKLen
		}
		for i := range t.inputTypes {
			window := t.topK.ColVec(i).Window(t.exportedFromTopK, newExportedFromTopK)
			t.windowedBatch.ReplaceCol(window, i)
		}
		t.windowedBatch.SetSelection(false)
		t.windowedBatch.SetLength(newExportedFromTopK - t.exportedFromTopK)
		t.exportedFromTopK = newExportedFromTopK
		return t.windowedBatch
	}
	// Next, we check whether we have exported all tuples from the last read
	// batch.
	if t.inputBatch != nil && t.firstUnprocessedTupleIdx+t.exportedFromBatch < t.inputBatch.Length() {
		colexecutils.MakeWindowIntoBatch(
			t.windowedBatch, t.inputBatch, t.firstUnprocessedTupleIdx, t.inputBatch.Length(), t.inputTypes,
		)
		t.exportedFromBatch = t.windowedBatch.Length()
		return t.windowedBatch
	}
	return coldata.ZeroBatch
}
// Len is part of heap.Interface and is only meant to be used internally.
func (t *topKSorter) Len() int {
	return len(t.heap)
}
// Less is part of heap.Interface and is only meant to be used internally.
func (t *topKSorter) Less(i, j int) bool {
	groupi := 0
	groupj := 0
	if t.hasPartialOrder {
		groupi = t.group[t.heap[i]]
		groupj = t.group[t.heap[j]]
	}
	return t.compareRow(topKVecIdx, topKVecIdx, t.heap[i], t.heap[j], groupi, groupj) > 0
}
// Swap is part of heap.Interface and is only meant to be used internally.

pkg/sql/colexec/sorttopk_test.go, line 96 at r2 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Hm, I was mostly concerned about adding the "group id" optimization (especially during heap.Init), so I wanted to increase the test coverage of the top K sort with partial ordering.

Got it.


pkg/sql/colexec/sorttopk_test.go, line 127 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe nCols <= maxCols?

Done.


pkg/sql/colexec/sorttopk_test.go, line 141 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

To simplify this a bit we could populated expected with all nTups tuples and then use expected[:k] when testing the top K.

Done.


pkg/sql/colexec/sorttopk_test.go, line 153 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think we should pull this loop out so that we pass matchLen to generateRandomDataForTestSort. Then we will avoid the need to the inputSorter.

Done.


pkg/sql/colexec/sorttopk_test.go, line 171 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think it'd be good to actually use colexectestutils.RunTests test harness since it randomizes the batch sizes and the selection vector.

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: templating - I think it'll be ok to merge this change without the templating assuming that we introduce the templating before 22.1 release to get the perf hit back.

BTW you could merge your previous work on the top K (first commits in this PR), and I'll take another close look at the improvement to the topKSorter.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @yuzefovich)


pkg/sql/colexec/sorttopk_tmpl.go, line 31 at r9 (raw file):

// execgen:template<partialOrder>
//gcassert:inline

There is actually execgen:inline directive which we probably want to use here.


pkg/sql/colexec/sorttopk_tmpl.go, line 158 at r9 (raw file):

	t *topKSorter, vecIdx1 int, vecIdx2 int, rowIdx1 int, rowIdx2 int, partialOrder bool,
) int {
	if partialOrder {

Yeah, I see what you mean about having to perform this check on every call. I think the older way of templates would work better here. The idea is that we would have two structs topKSorter and topKSorterPartialOrder, and then we would have different Less function implementations for them.

Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to include the current template changes, then refactor/rework for better performance?

In general, with template has better performance:

Without template performance over base (includes partial order benchmarking): https://gist.github.com/rharding6373/a5bda2759799996daf0d23f08cd85e48

With template performance over base: https://gist.github.com/rharding6373/aec53b113b8ce79763f6db6528d9921e

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @yuzefovich)

@yuzefovich
Copy link
Member

I'd probably choose between either not including the templating changes or completing them to the expected final form, but it is really up to you.

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch 3 times, most recently from fcce1ca to 954179f Compare September 17, 2021 23:41
Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed up the template and piped some distsql changes through so that the database can use this feature even if it's not reflected in the cost model yet. The latest benchmark numbers are here and demonstrate minimal performance impact to unsorted input, and significant gains when the input is partially ordered: https://gist.github.com/rharding6373/768fd567529347a0bde1160a68e884a7

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @yuzefovich)


pkg/sql/colexec/sorttopk_tmpl.go, line 31 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

There is actually execgen:inline directive which we probably want to use here.

Done.


pkg/sql/colexec/sorttopk_tmpl.go, line 158 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Yeah, I see what you mean about having to perform this check on every call. I think the older way of templates would work better here. The idea is that we would have two structs topKSorter and topKSorterPartialOrder, and then we would have different Less function implementations for them.

Jordan gave me a great suggestion similar to yours, but only have separate structs to implement Less composed with topKSorter. Seems to have made the overhead negligible.

@rharding6373 rharding6373 marked this pull request as ready for review September 17, 2021 23:42
@rharding6373 rharding6373 requested review from a team as code owners September 17, 2021 23:42
@rharding6373 rharding6373 requested a review from rytaft September 17, 2021 23:42
@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from 954179f to 9af3556 Compare September 17, 2021 23:56
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I have a bunch of nits, but I think it's basically good to go.

Reviewed 34 of 34 files at r15, 41 of 41 files at r16, 36 of 36 files at r17, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rharding6373 and @rytaft)


-- commits, line 18 at r15:
nit: "ordered ordered".


-- commits, line 65 at r16:
super nit: wrap the line at 80 characters.


-- commits, line 67 at r16:
nit: we usually do Release note: None.


-- commits, line 69 at r16:
nit: no longer needed.


-- commits, line 86 at r17:
nit: no longer needed.


pkg/sql/colexec/sorttopk.go, line 120 at r14 (raw file):

	t.windowedBatch = coldata.NewMemBatchNoCols(t.inputTypes, coldata.BatchSize())
}

nit: looks like empty lines are missing between different functions (usually we use exactly one empty line) here and in several places below.


pkg/sql/colexec/sorttopk.go, line 118 at r16 (raw file):

	// sel is a selection vector which specifies an ordering on topK.
	sel []int
	// group stores the group number associated with each entry in topK.

nit: this field is only used if hasPartialOrder == true, what do you think about grouping all such fields? I'm thinking of something like:

partialOrderState struct {
  group []int
  distincterInput *colexecop.FeedOperator
  distincter colexecop.Operator
  distinctOutput []bool
}

pkg/sql/colexec/sorttopk.go, line 140 at r16 (raw file):

	t.Input.Init(t.Ctx)
	t.topK = colexecutils.NewAppendOnlyBufferedBatch(t.allocator, t.inputTypes, nil /* colsToStore */)
	// TODO(harding): We only need to create a vecComparator for the input vectors in orderingCols.

nit: wrap at 80.


pkg/sql/colexec/sorttopk.go, line 344 at r16 (raw file):

		if t.hasPartialOrder && i < t.matchLen && groupIdx1 == groupIdx2 {
			// If the tuples being compared are in the same group, we only need to
			// compare the columns that are not already ordered.

nit: this is a good place to put the TODO for the case groupIdx1 != groupIdx2 (I didn't find that TODO, ignore if it's already somewhere).


pkg/sql/colexec/sorttopk.go, line 78 at r17 (raw file):

	return base, nil

	//return newTopKSorter(allocator, input, inputTypes, orderingCols, matchLen, k, maxOutputBatchMemSize)

leftover


pkg/sql/colexec/sorttopk_test.go, line 128 at r15 (raw file):

			for matchLen := 1; matchLen < nOrderingCols; matchLen++ {
				tups, _, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols, matchLen)
				input := colexectestutils.NewOpTestInput(testAllocator, 1 /* batchSize */, tups, typs[:nCols])

super nit: since here we're not interested in increasing of the test coverage, I would use coldata.BatchSize() for the batch size argument.


pkg/sql/colexec/sorttopk_test.go, line 142 at r15 (raw file):

					}
				}
				// Test values for k need to be monotonically increasing, so we can reuse

nit: I'm a bit confused by this comment, maybe it's a leftover now?


pkg/sql/colexec/sorttopk_test.go, line 204 at r15 (raw file):

}

func makeOrdCols(

nit: maybe use a bit more descriptive name for the function as well as add a quick comment on what it does?


pkg/sql/colexec/sorttopk_tmpl.go, line 158 at r9 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Jordan gave me a great suggestion similar to yours, but only have separate structs to implement Less composed with topKSorter. Seems to have made the overhead negligible.

Nice!


pkg/sql/colexec/sorttopk_tmpl.go, line 14 at r17 (raw file):

// +build execgen_template
//
// This file is the execgen template for sorttopk_tmpl.eg.go. It's formatted in a

nit: s/sorttopk_tmpl.eg.go/sorttopk.eg.go/.


pkg/sql/colexec/sorttopk_tmpl.go, line 71 at r17 (raw file):

			for i, k := 0, t.topK.Length(); i < fromLength; i, k = i+1, k+1 {
				idx := i
				if sel != nil {

Now that we're templating the code, we could also template out sel == nil vs sel != nil case (here and below) to make this check once per batch instead of once per tuple (we do similar templating in most places).


pkg/sql/colexec/sorttopk_tmpl.go, line 143 at r17 (raw file):

			},
		)
		if groupDone {

nit: groupDone can only be true if partialOrder == true, so maybe worth wrapping it with a template conditional? I'm guessing that it shouldn't have an impact on the performance because if partialOrder == false, then groupDone is never updated, and the compiler should remove the check entirely (but maybe it's not the case, and we can help the compiler :) ).


pkg/sql/colexec/sorttopk_tmpl.go, line 209 at r17 (raw file):

}

// topKHeaper implements part of the heap.Interface for non-ordered input.

nit: in order to show and enforce that a struct implements an interface we usually use the following:

var _ heap.Interface = &topKHeaper{}

pkg/sql/colexec/colexectestutils/utils.go, line 1407 at r15 (raw file):

// AssertTuplesOrderedEqual asserts that two permutations of tuples are equal
// in order.
func AssertTuplesOrderedEqual(expected Tuples, actual Tuples, evalCtx *tree.EvalContext) error {

nit: I think we no longer need to export this.


pkg/sql/colexec/execgen/cmd/execgen/sorttopk_gen.go, line 23 at r17 (raw file):

const sortTopKTmpl = "pkg/sql/colexec/sorttopk_tmpl.go"

func genSortTopK(inputFileContents string, wr io.Writer) error {

The current getSortTopK is set up for the old template style, and I think you're only using the new style, so this function can be simplified. I think you only need something like hash_aggregator_gen.go.


pkg/sql/opt/exec/execbuilder/relational.go, line 1534 at r16 (raw file):

	}
	ordering := e.Ordering.ToOrdering()
	inputOrdering := e.Input.ProvidedPhysical().Ordering

@rytaft could you please take a quick look at the optimizer changes here?

This change adds unit, randomized, and benchmark testing for the topk
sorter to add coverage of partially ordered columns. A
subsequent commit will add changes to the topk sorter in the vectorized
execution engine to take advantage of partial ordering for performance
improvements.

Release note: None
Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: once you address items from @yuzefovich

Reviewed 18 of 32 files at r4, 31 of 60 files at r14, 3 of 34 files at r15, 11 of 41 files at r16, 7 of 36 files at r17, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @rharding6373)


pkg/sql/colexec/sorttopk_test.go, line 159 at r17 (raw file):

					name := fmt.Sprintf("nCols=%d/nOrderingCols=%d/matchLen=%d/k=%d", nCols, nOrderingCols, matchLen, k)
					log.Infof(ctx, "%s", name)
					colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected[:k], colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {

nit: wrap code lines at ~100 chars


pkg/sql/colexec/sorttopk_tmpl.go, line 50 at r17 (raw file):

// in sorted order.
// execgen:template<partialOrder>
func spool(t *topKSorter, partialOrder bool) {

nit: mention the partialOrder param in the function comment

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from 9af3556 to 9a5ab4a Compare September 21, 2021 02:00
Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @rharding6373, @rytaft, and @yuzefovich)


-- commits, line 65 at r16:

Previously, yuzefovich (Yahor Yuzefovich) wrote…

super nit: wrap the line at 80 characters.

Done.


-- commits, line 69 at r16:

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: no longer needed.

Done.


pkg/sql/colexec/sorttopk.go, line 118 at r16 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this field is only used if hasPartialOrder == true, what do you think about grouping all such fields? I'm thinking of something like:

partialOrderState struct {
  group []int
  distincterInput *colexecop.FeedOperator
  distincter colexecop.Operator
  distinctOutput []bool
}

Done.


pkg/sql/colexec/sorttopk.go, line 344 at r16 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this is a good place to put the TODO for the case groupIdx1 != groupIdx2 (I didn't find that TODO, ignore if it's already somewhere).

Added a TODO in sorttopk_tmpl.go


pkg/sql/colexec/sorttopk.go, line 78 at r17 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

leftover

Done.


pkg/sql/colexec/sorttopk_test.go, line 128 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

super nit: since here we're not interested in increasing of the test coverage, I would use coldata.BatchSize() for the batch size argument.

Done.


pkg/sql/colexec/sorttopk_test.go, line 142 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: I'm a bit confused by this comment, maybe it's a leftover now?

It was, thanks for the catch.


pkg/sql/colexec/sorttopk_test.go, line 204 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe use a bit more descriptive name for the function as well as add a quick comment on what it does?

Done.


pkg/sql/colexec/sorttopk_tmpl.go, line 158 at r9 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Nice!

Resolved


pkg/sql/colexec/sorttopk_tmpl.go, line 50 at r17 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: mention the partialOrder param in the function comment

Done.


pkg/sql/colexec/sorttopk_tmpl.go, line 71 at r17 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Now that we're templating the code, we could also template out sel == nil vs sel != nil case (here and below) to make this check once per batch instead of once per tuple (we do similar templating in most places).

Done.


pkg/sql/colexec/sorttopk_tmpl.go, line 143 at r17 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: groupDone can only be true if partialOrder == true, so maybe worth wrapping it with a template conditional? I'm guessing that it shouldn't have an impact on the performance because if partialOrder == false, then groupDone is never updated, and the compiler should remove the check entirely (but maybe it's not the case, and we can help the compiler :) ).

Done.


pkg/sql/colexec/sorttopk_tmpl.go, line 209 at r17 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: in order to show and enforce that a struct implements an interface we usually use the following:

var _ heap.Interface = &topKHeaper{}

Done.


pkg/sql/colexec/execgen/cmd/execgen/sorttopk_gen.go, line 23 at r17 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

The current getSortTopK is set up for the old template style, and I think you're only using the new style, so this function can be simplified. I think you only need something like hash_aggregator_gen.go.

Thanks for the tip, done.


pkg/sql/colexec/colexectestutils/utils.go, line 1407 at r15 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: I think we no longer need to export this.

Done.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! :lgtm:

nit: I think this issue closes #69724, right? We usually include a line like Fixes: #69724 into the PR description if so.

Reviewed 1 of 48 files at r18, 47 of 47 files at r19, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @rharding6373)


pkg/sql/colexec/sorttopk.go, line 118 at r16 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Done.

By the way, not sure if you're already aware of this, but in Go it's possible to define an anonymous struct inside of another struct, i.e. something like

type outerStruct struct {
  a int
  b int
  innerStruct struct {
    c int
    d int
  }
}

It has an advantage of not exposing innerStruct to the whole package and a disadvantage of being annoying to instantiate innerStruct object separately from outerStruct.


pkg/sql/colexec/execgen/cmd/execgen/sorttopk_gen.go, line 26 at r19 (raw file):

func init() {
	/*

nit: leftover.

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 48 files at r18, 47 of 47 files at r19, all commit messages.
Reviewable status: :shipit: complete! 2 of 0 LGTMs obtained (waiting on @rharding6373)


pkg/sql/colexec/sorttopk_test.go, line 217 at r19 (raw file):

}

// generatePartiallyOrderedColumns generates randomized input data with nCols columns, where the

nit: comments should be <= 80 cols wide


pkg/sql/colexec/sorttopk_tmpl.go, line 96 at r19 (raw file):

				t.orderState.group[maxIdx] = groupId
			}
			if partialOrder {

do you need this if block? looks like both branches do the same thing...

@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from 9a5ab4a to 95bb205 Compare September 21, 2021 16:26
Copy link
Collaborator Author

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTRs! For #69724 there's still opt-side updates that need to happen, namely updating the cost model. I'll keep it open until that's done.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @rharding6373, @rytaft, and @yuzefovich)


pkg/sql/colexec/sorttopk.go, line 118 at r16 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

By the way, not sure if you're already aware of this, but in Go it's possible to define an anonymous struct inside of another struct, i.e. something like

type outerStruct struct {
  a int
  b int
  innerStruct struct {
    c int
    d int
  }
}

It has an advantage of not exposing innerStruct to the whole package and a disadvantage of being annoying to instantiate innerStruct object separately from outerStruct.

Didn't know that, thanks for the tip!


pkg/sql/colexec/sorttopk_tmpl.go, line 96 at r19 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

do you need this if block? looks like both branches do the same thing...

Thanks for the catch!

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Reviewed 4 of 4 files at r20, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @rharding6373 and @rytaft)


pkg/sql/colexec/sorttopk_test.go, line 217 at r19 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: comments should be <= 80 cols wide

BTW if you're using Goland, there are some helpful things to set up. One is "Wrap to Column" plugin (Preferences -> Plugins), another is the visual guide - I have mine set at 80 characters (Preferences -> Editor -> Code Style).

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For #69724 there's still opt-side updates that need to happen, namely updating the cost model. I'll keep it open until that's done.

In that case you might consider adding "Informs #69724", so GitHub will link it, but won't close the issue

Reviewed 4 of 4 files at r20, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @rharding6373)

Previously, topKSorter had to process all input rows before returning
the top K rows according to its specified ordering. If a subset of the
input rows were already ordered, topKSorter would still iterate over the
entire input.

However, if the input was partially ordered, topKSorter could
potentially stop iterating early, since after it has found K candidates
it is guaranteed not to find any better top candidates.

For example, take the following query and table with an index on a:

```
  a | b
----+----
  1 | 5
  2 | 3
  2 | 1
  3 | 3
  5 | 3

SELECT * FROM t ORDER BY a, b LIMIT 2
```

Given an index scan on a to provide a's ordering, topk only needs to
process 3 rows in order to guarantee that it has found the top K rows.
Once it finishes processing the third row [2, 1], all subsequent rows
have higher values of a than the top 2 rows found so far, and
therefore cannot be in the top 2 rows.

This change modifies the vectorized engine's TopKSorter signature to include
a partial ordering. The TopKSorter chunks the input according to the
sorted columns and processes each chunk with its existing heap
algorithm. Row comparison in the heap is also optimized so that tuples
in the same chunk only compare non-sorted columns. At the end
of each chunk, if K rows are in the heap, TopKSorter emits the rows and
stops execution.

A later commit, once merged with top K optimizer and distsql changes,
will adjust the cost model for top K to reflect this change.

Informs cockroachdb#69724

Release note: None
@rharding6373 rharding6373 force-pushed the topk_partial_order_69724 branch from 95bb205 to afc78d5 Compare September 21, 2021 17:26
@rharding6373
Copy link
Collaborator Author

bors r+

@craig craig bot merged commit af43657 into cockroachdb:master Sep 21, 2021
@craig
Copy link
Contributor

craig bot commented Sep 21, 2021

Build succeeded:

@rharding6373 rharding6373 deleted the topk_partial_order_69724 branch September 21, 2021 22:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants