Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: allow lookup joins to order on index columns #84689

Merged
merged 1 commit into from
Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
// space for the results at the front of the line. This would occur when the
// budget limitBytes is reached and the size estimates that lead to too much
// concurrency in the execution were wrong.
//
// When there are multiple results associated with a given request, they are
// sorted in lookup order for that request (though not globally).
InOrder
// OutOfOrder is the mode of operation in which the results are delivered in
// the order in which they're produced. The caller will use the keys field
Expand Down Expand Up @@ -424,7 +427,12 @@ func (s *Streamer) Init(
// The Streamer takes over the given requests, will perform the memory
// accounting against its budget and might modify the requests in place.
//
// In InOrder operation mode, responses will be delivered in reqs order.
// In InOrder operation mode, responses will be delivered in reqs order. When
// more than one row is returned for a given request, the rows for that request
// will be sorted in the order of the lookup index if the index contains only
// ascending columns.
// TODO(drewk): lift the restriction that index columns must be ASC in order to
// return results in lookup order.
//
// It is the caller's responsibility to ensure that the memory footprint of reqs
// (i.e. roachpb.Spans inside of the requests) is reasonable. Enqueue will
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,22 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
return nil, err
}

// If any of the ordering columns originate from the lookup table, this is a
// case where we are ordering on a prefix of input columns followed by the
// lookup columns. We need to maintain the index ordering on each lookup.
var maintainLookupOrdering bool
numInputCols := len(plan.GetResultTypes())
for i := range n.reqOrdering {
if n.reqOrdering[i].ColIdx >= numInputCols {
maintainLookupOrdering = true
if n.reqOrdering[i].Direction == encoding.Descending {
// Validate that an ordering on lookup columns does not contain
// descending columns.
panic(errors.AssertionFailedf("ordering on a lookup index with descending columns"))
}
}
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
Expand All @@ -2341,6 +2357,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
MaintainOrdering: len(n.reqOrdering) > 0 || n.isFirstJoinInPairedJoiner,
MaintainLookupOrdering: maintainLookupOrdering,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ message JoinReaderSpec {
// Not used if there is a limit set in the PostProcessSpec of this processor
// (that value will be used for sizing batches instead).
optional int64 limit_hint = 21 [(gogoproto.nullable) = false];

// Indicates that for each input row, the join reader should return looked-up
// rows in sorted order. This is only applicable to lookup joins for which
// more than one lookup row may be associated with a given input row. It can
// only be set to true if maintain_ordering is also true.
// maintain_lookup_ordering can be used if the output needs to be ordered by
// a prefix of input columns followed by index (lookup) columns without
// requiring a (buffered) sort. As an additional restriction due to
// implementation details, maintain_lookup_ordering can only be used when the
// index columns that participate in the output ordering are all ASC.
optional bool maintain_lookup_ordering = 22 [(gogoproto.nullable) = false];
}

// SorterSpec is the specification for a "sorting aggregator". A sorting
Expand Down
98 changes: 98 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,101 @@ AND items.author_id != views.user_id
WHERE views.chat_id = 1 and views.user_id = 1;
----
1 1 NULL NULL NULL

# Test that lookup joins can supply an ordering on input and lookup columns.
# Compare with the output of a hash join with the same ordering (which will
# have to sort its output).

statement ok
CREATE TABLE xyz (x INT, y INT, z INT, PRIMARY KEY(x, y, z));

statement ok
CREATE TABLE uvw (u INT, v INT, w INT, PRIMARY KEY(u, v, w));

statement ok
INSERT INTO xyz VALUES (1, 1, 1), (1, 1, 2), (1, 2, 3), (2, 1, 4), (2, 1, 5), (2, 1, 6), (3, 1, 7);

statement ok
INSERT INTO uvw VALUES (1, 1, 1), (1, 2, 2), (1, 2, 3), (2, 1, 4), (2, 1, 5), (2, 2, 6), (2, 2, 7);

query IIIIII colnames
SELECT * FROM xyz INNER LOOKUP JOIN uvw ON x = u ORDER BY x, y, z, u, v, w
----
x y z u v w
1 1 1 1 1 1
1 1 1 1 2 2
1 1 1 1 2 3
1 1 2 1 1 1
1 1 2 1 2 2
1 1 2 1 2 3
1 2 3 1 1 1
1 2 3 1 2 2
1 2 3 1 2 3
2 1 4 2 1 4
2 1 4 2 1 5
2 1 4 2 2 6
2 1 4 2 2 7
2 1 5 2 1 4
2 1 5 2 1 5
2 1 5 2 2 6
2 1 5 2 2 7
2 1 6 2 1 4
2 1 6 2 1 5
2 1 6 2 2 6
2 1 6 2 2 7

query IIIIII colnames
SELECT * FROM xyz INNER HASH JOIN uvw ON x = u ORDER BY x, y, z, u, v, w
----
x y z u v w
1 1 1 1 1 1
1 1 1 1 2 2
1 1 1 1 2 3
1 1 2 1 1 1
1 1 2 1 2 2
1 1 2 1 2 3
1 2 3 1 1 1
1 2 3 1 2 2
1 2 3 1 2 3
2 1 4 2 1 4
2 1 4 2 1 5
2 1 4 2 2 6
2 1 4 2 2 7
2 1 5 2 1 4
2 1 5 2 1 5
2 1 5 2 2 6
2 1 5 2 2 7
2 1 6 2 1 4
2 1 6 2 1 5
2 1 6 2 2 6
2 1 6 2 2 7

query IIIIII colnames
SELECT * FROM xyz INNER LOOKUP JOIN uvw ON x = u AND y = v ORDER BY u, x, v, y, z, w
----
x y z u v w
1 1 1 1 1 1
1 1 2 1 1 1
1 2 3 1 2 2
1 2 3 1 2 3
2 1 4 2 1 4
2 1 4 2 1 5
2 1 5 2 1 4
2 1 5 2 1 5
2 1 6 2 1 4
2 1 6 2 1 5

query IIIIII colnames
SELECT * FROM xyz INNER HASH JOIN uvw ON x = u AND y = v ORDER BY u, x, v, y, z, w
----
x y z u v w
1 1 1 1 1 1
1 1 2 1 1 1
1 2 3 1 2 2
1 2 3 1 2 3
2 1 4 2 1 4
2 1 4 2 1 5
2 1 5 2 1 4
2 1 5 2 1 5
2 1 6 2 1 4
2 1 6 2 1 5
2 changes: 2 additions & 0 deletions pkg/sql/opt/ordering/interesting_orderings.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func interestingOrderingsForJoin(rel memo.RelExpr) props.OrderingSet {
}
// For a join, we could conceivably preserve the order of one side (even with
// hash-join, depending on which side we store).
// TODO(drewk): add logic for orderings on columns from both sides, since both
// lookup and merge joins can provide them.
ordLeft := DeriveInterestingOrderings(rel.Child(0).(memo.RelExpr))
ordRight := DeriveInterestingOrderings(rel.Child(1).(memo.RelExpr))
ord := make(props.OrderingSet, 0, len(ordLeft)+len(ordRight))
Expand Down
Loading