From 3570fea611f0d81d0f20ef4f39ebe3b16ad43836 Mon Sep 17 00:00:00 2001 From: DrewKimball Date: Tue, 19 Jul 2022 16:54:43 -0700 Subject: [PATCH] opt: allow lookup joins to order on index columns It is possible for lookup joins to return the results of each lookup in the order of the lookup index. In the case when the input is ordered on a key, preserving the input ordering and then returning looked-up rows in index order is equivalent to performing a sort on the input ordering with the index columns appended. This patch teaches the optimizer that lookup joins can preserve the index ordering. This allows the optimizer to avoid sorting in some cases, which can significantly improve performance because sorts have to buffer all input rows. Due to implementation details of the lookup join, order can only be preserved when none of the index columns involved in the ordering are sorted in descending order. Fixes #84685 Release note (performance improvement): The optimizer can now return the results of a join in sorted order in more cases. This can allow the optimizer to avoid expensive sorts that need to buffer all input rows. --- pkg/kv/kvclient/kvstreamer/streamer.go | 10 +- pkg/sql/distsql_physical_planner.go | 17 + pkg/sql/execinfrapb/processors_sql.proto | 11 + .../logictest/testdata/logic_test/lookup_join | 98 +++++ pkg/sql/opt/ordering/interesting_orderings.go | 2 + pkg/sql/opt/ordering/lookup_join.go | 220 ++++++++++- pkg/sql/opt/ordering/lookup_join_test.go | 356 +++++++++++++++++- pkg/sql/opt/ordering/ordering.go | 4 +- pkg/sql/opt/xform/testdata/external/tpce | 9 +- .../opt/xform/testdata/external/tpce-no-stats | 39 +- .../opt/xform/testdata/external/tpch-no-stats | 15 +- pkg/sql/opt/xform/testdata/physprops/ordering | 191 +++++++++- pkg/sql/rowexec/joinreader.go | 31 +- 13 files changed, 933 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index caffb6463b92..ee94232a2da1 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -51,6 +51,9 @@ const ( // space for the results at the front of the line. This would occur when the // budget limitBytes is reached and the size estimates that lead to too much // concurrency in the execution were wrong. + // + // When there are multiple results associated with a given request, they are + // sorted in lookup order for that request (though not globally). InOrder // OutOfOrder is the mode of operation in which the results are delivered in // the order in which they're produced. The caller will use the keys field @@ -424,7 +427,12 @@ func (s *Streamer) Init( // The Streamer takes over the given requests, will perform the memory // accounting against its budget and might modify the requests in place. // -// In InOrder operation mode, responses will be delivered in reqs order. +// In InOrder operation mode, responses will be delivered in reqs order. When +// more than one row is returned for a given request, the rows for that request +// will be sorted in the order of the lookup index if the index contains only +// ascending columns. +// TODO(drewk): lift the restriction that index columns must be ASC in order to +// return results in lookup order. // // It is the caller's responsibility to ensure that the memory footprint of reqs // (i.e. roachpb.Spans inside of the requests) is reasonable. Enqueue will diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index ac22ff36a426..ea67bd53059e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2333,6 +2333,22 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( return nil, err } + // If any of the ordering columns originate from the lookup table, this is a + // case where we are ordering on a prefix of input columns followed by the + // lookup columns. We need to maintain the index ordering on each lookup. + var maintainLookupOrdering bool + numInputCols := len(plan.GetResultTypes()) + for i := range n.reqOrdering { + if n.reqOrdering[i].ColIdx >= numInputCols { + maintainLookupOrdering = true + if n.reqOrdering[i].Direction == encoding.Descending { + // Validate that an ordering on lookup columns does not contain + // descending columns. + panic(errors.AssertionFailedf("ordering on a lookup index with descending columns")) + } + } + } + joinReaderSpec := execinfrapb.JoinReaderSpec{ Type: n.joinType, LockingStrength: n.table.lockingStrength, @@ -2341,6 +2357,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( // is late in the sense that the cost of this has not been taken into // account. Make this decision earlier in CustomFuncs.GenerateLookupJoins. MaintainOrdering: len(n.reqOrdering) > 0 || n.isFirstJoinInPairedJoiner, + MaintainLookupOrdering: maintainLookupOrdering, LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner, OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner, LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit, diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index 960f9ba1a68e..684ce304fb83 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -388,6 +388,17 @@ message JoinReaderSpec { // Not used if there is a limit set in the PostProcessSpec of this processor // (that value will be used for sizing batches instead). optional int64 limit_hint = 21 [(gogoproto.nullable) = false]; + + // Indicates that for each input row, the join reader should return looked-up + // rows in sorted order. This is only applicable to lookup joins for which + // more than one lookup row may be associated with a given input row. It can + // only be set to true if maintain_ordering is also true. + // maintain_lookup_ordering can be used if the output needs to be ordered by + // a prefix of input columns followed by index (lookup) columns without + // requiring a (buffered) sort. As an additional restriction due to + // implementation details, maintain_lookup_ordering can only be used when the + // index columns that participate in the output ordering are all ASC. + optional bool maintain_lookup_ordering = 22 [(gogoproto.nullable) = false]; } // SorterSpec is the specification for a "sorting aggregator". A sorting diff --git a/pkg/sql/logictest/testdata/logic_test/lookup_join b/pkg/sql/logictest/testdata/logic_test/lookup_join index 41c886b12a9b..d3ff2a6735d1 100644 --- a/pkg/sql/logictest/testdata/logic_test/lookup_join +++ b/pkg/sql/logictest/testdata/logic_test/lookup_join @@ -818,3 +818,101 @@ AND items.author_id != views.user_id WHERE views.chat_id = 1 and views.user_id = 1; ---- 1 1 NULL NULL NULL + +# Test that lookup joins can supply an ordering on input and lookup columns. +# Compare with the output of a hash join with the same ordering (which will +# have to sort its output). + +statement ok +CREATE TABLE xyz (x INT, y INT, z INT, PRIMARY KEY(x, y, z)); + +statement ok +CREATE TABLE uvw (u INT, v INT, w INT, PRIMARY KEY(u, v, w)); + +statement ok +INSERT INTO xyz VALUES (1, 1, 1), (1, 1, 2), (1, 2, 3), (2, 1, 4), (2, 1, 5), (2, 1, 6), (3, 1, 7); + +statement ok +INSERT INTO uvw VALUES (1, 1, 1), (1, 2, 2), (1, 2, 3), (2, 1, 4), (2, 1, 5), (2, 2, 6), (2, 2, 7); + +query IIIIII colnames +SELECT * FROM xyz INNER LOOKUP JOIN uvw ON x = u ORDER BY x, y, z, u, v, w +---- +x y z u v w +1 1 1 1 1 1 +1 1 1 1 2 2 +1 1 1 1 2 3 +1 1 2 1 1 1 +1 1 2 1 2 2 +1 1 2 1 2 3 +1 2 3 1 1 1 +1 2 3 1 2 2 +1 2 3 1 2 3 +2 1 4 2 1 4 +2 1 4 2 1 5 +2 1 4 2 2 6 +2 1 4 2 2 7 +2 1 5 2 1 4 +2 1 5 2 1 5 +2 1 5 2 2 6 +2 1 5 2 2 7 +2 1 6 2 1 4 +2 1 6 2 1 5 +2 1 6 2 2 6 +2 1 6 2 2 7 + +query IIIIII colnames +SELECT * FROM xyz INNER HASH JOIN uvw ON x = u ORDER BY x, y, z, u, v, w +---- +x y z u v w +1 1 1 1 1 1 +1 1 1 1 2 2 +1 1 1 1 2 3 +1 1 2 1 1 1 +1 1 2 1 2 2 +1 1 2 1 2 3 +1 2 3 1 1 1 +1 2 3 1 2 2 +1 2 3 1 2 3 +2 1 4 2 1 4 +2 1 4 2 1 5 +2 1 4 2 2 6 +2 1 4 2 2 7 +2 1 5 2 1 4 +2 1 5 2 1 5 +2 1 5 2 2 6 +2 1 5 2 2 7 +2 1 6 2 1 4 +2 1 6 2 1 5 +2 1 6 2 2 6 +2 1 6 2 2 7 + +query IIIIII colnames +SELECT * FROM xyz INNER LOOKUP JOIN uvw ON x = u AND y = v ORDER BY u, x, v, y, z, w +---- +x y z u v w +1 1 1 1 1 1 +1 1 2 1 1 1 +1 2 3 1 2 2 +1 2 3 1 2 3 +2 1 4 2 1 4 +2 1 4 2 1 5 +2 1 5 2 1 4 +2 1 5 2 1 5 +2 1 6 2 1 4 +2 1 6 2 1 5 + +query IIIIII colnames +SELECT * FROM xyz INNER HASH JOIN uvw ON x = u AND y = v ORDER BY u, x, v, y, z, w +---- +x y z u v w +1 1 1 1 1 1 +1 1 2 1 1 1 +1 2 3 1 2 2 +1 2 3 1 2 3 +2 1 4 2 1 4 +2 1 4 2 1 5 +2 1 5 2 1 4 +2 1 5 2 1 5 +2 1 6 2 1 4 +2 1 6 2 1 5 diff --git a/pkg/sql/opt/ordering/interesting_orderings.go b/pkg/sql/opt/ordering/interesting_orderings.go index ad9541dac156..af5c90431e5e 100644 --- a/pkg/sql/opt/ordering/interesting_orderings.go +++ b/pkg/sql/opt/ordering/interesting_orderings.go @@ -179,6 +179,8 @@ func interestingOrderingsForJoin(rel memo.RelExpr) props.OrderingSet { } // For a join, we could conceivably preserve the order of one side (even with // hash-join, depending on which side we store). + // TODO(drewk): add logic for orderings on columns from both sides, since both + // lookup and merge joins can provide them. ordLeft := DeriveInterestingOrderings(rel.Child(0).(memo.RelExpr)) ordRight := DeriveInterestingOrderings(rel.Child(1).(memo.RelExpr)) ord := make(props.OrderingSet, 0, len(ordLeft)+len(ordRight)) diff --git a/pkg/sql/opt/ordering/lookup_join.go b/pkg/sql/opt/ordering/lookup_join.go index bd8497c62df0..cf295c7bd174 100644 --- a/pkg/sql/opt/ordering/lookup_join.go +++ b/pkg/sql/opt/ordering/lookup_join.go @@ -16,14 +16,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/props" ) -func lookupOrIndexJoinCanProvideOrdering(expr memo.RelExpr, required *props.OrderingChoice) bool { - // LookupJoin and IndexJoin can pass through their ordering if the ordering - // depends only on columns present in the input. +func indexJoinCanProvideOrdering(expr memo.RelExpr, required *props.OrderingChoice) bool { + // IndexJoin can pass through its ordering if the ordering depends only on + // columns present in the input. inputCols := expr.Child(0).(memo.RelExpr).Relational().OutputCols - canProjectCols := required.CanProjectCols(inputCols) + return required.CanProjectCols(inputCols) +} + +func lookupJoinCanProvideOrdering(expr memo.RelExpr, required *props.OrderingChoice) bool { + lookupJoin := expr.(*memo.LookupJoinExpr) - if lookupJoin, ok := expr.(*memo.LookupJoinExpr); ok && - canProjectCols && lookupJoin.IsSecondJoinInPairedJoiner { + // LookupJoin can pass through its ordering if the ordering depends only on + // columns present in the input. + inputCols := lookupJoin.Input.Relational().OutputCols + canProjectUsingOnlyInputCols := required.CanProjectCols(inputCols) + + if canProjectUsingOnlyInputCols && lookupJoin.IsSecondJoinInPairedJoiner { // Can only pass through ordering if the ordering can be provided by the // child, since we don't want a sort to be interposed between the child // and this join. @@ -31,7 +39,7 @@ func lookupOrIndexJoinCanProvideOrdering(expr memo.RelExpr, required *props.Orde // We may need to remove ordering columns that are not output by the input // expression. This results in an equivalent ordering, but with fewer // options in the OrderingChoice. - child := expr.Child(0).(memo.RelExpr) + child := lookupJoin.Input res := projectOrderingToInput(child, required) // It is in principle possible that the lookup join has an ON condition that // forces an equality on two columns in the input. In this case we need to @@ -43,7 +51,34 @@ func lookupOrIndexJoinCanProvideOrdering(expr memo.RelExpr, required *props.Orde res = trimColumnGroups(&res, &child.Relational().FuncDeps) return CanProvide(child, &res) } - return canProjectCols + if !canProjectUsingOnlyInputCols { + // It is not possible to serve the required ordering using only input + // columns. However, the lookup join may be able to satisfy the required + // ordering by appending index columns to the input ordering. See the + // getLookupOrdCols comment for more information. + // + // Iterate through the prefix of the required columns that can project input + // columns, and set up to test whether the input ordering can be extended + // with index columns. + var remainingRequired props.OrderingChoice + canProjectPrefixCols := required.Optional.Copy() + for i := range required.Columns { + if !required.Columns[i].Group.Intersects(inputCols) { + // We have reached the end of the prefix of the required ordering that + // can be projected by input columns. Keep track of the rest of the + // columns that cannot be satisfied by an ordering on the input. + remainingRequired.Columns = required.Columns[i:] + remainingRequired.Optional = required.Optional.Copy() + break + } + canProjectPrefixCols.UnionWith(required.Columns[i].Group) + } + // Check whether appending index columns to the input ordering would satisfy + // the required ordering. + _, ok := getLookupOrdCols(lookupJoin, &remainingRequired, canProjectPrefixCols) + return ok + } + return canProjectUsingOnlyInputCols } func lookupOrIndexJoinBuildChildReqOrdering( @@ -52,10 +87,25 @@ func lookupOrIndexJoinBuildChildReqOrdering( if childIdx != 0 { return props.OrderingChoice{} } + child := parent.Child(0).(memo.RelExpr) + + if _, ok := parent.(*memo.LookupJoinExpr); ok { + // We need to truncate the ordering columns to the prefix of those that can + // project input columns without interleaving with index columns (see + // maybeAddLookupOrdCols in lookupJoinBuildProvided). + for i := range required.Columns { + if !required.Columns[i].Group.Intersects(child.Relational().OutputCols) { + // Shallow-copy the required properties and reslice the copy. + newRequired := *required + required = &newRequired + required.Columns = required.Columns[:i] + break + } + } + } // We may need to remove ordering columns that are not output by the input // expression. - child := parent.Child(0).(memo.RelExpr) res := projectOrderingToInput(child, required) // It is in principle possible that the lookup join has an ON condition that // forces an equality on two columns in the input. In this case we need to @@ -93,21 +143,34 @@ func indexJoinBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) o func lookupJoinBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) opt.Ordering { lookupJoin := expr.(*memo.LookupJoinExpr) - childProvided := lookupJoin.Input.ProvidedPhysical().Ordering + provided := lookupJoin.Input.ProvidedPhysical().Ordering + + var toExtend *props.OrderingChoice + if provided, toExtend = trySatisfyRequired(required, provided); toExtend != nil { + // The input provided ordering cannot satisfy the required ordering. It may + // be possible to append lookup columns in order to do so. See the + // getLookupOrdColumns for more details. + if lookupProvided, ok := getLookupOrdCols(lookupJoin, toExtend, provided.ColSet()); ok { + newProvided := make(opt.Ordering, len(provided)+len(lookupProvided)) + copy(newProvided, provided) + copy(newProvided[len(provided):], lookupProvided) + provided = newProvided + } + } // The lookup join includes an implicit projection (lookupJoin.Cols); some of // the input columns might not be output columns so we may need to remap them. // First check if we need to. needsRemap := false - for i := range childProvided { - if !lookupJoin.Cols.Contains(childProvided[i].ID()) { + for i := range provided { + if !lookupJoin.Cols.Contains(provided[i].ID()) { needsRemap = true break } } if !needsRemap { // Fast path: we don't need to remap any columns. - return childProvided + return provided } // Because of the implicit projection, the FDs of the LookupJoin don't include @@ -128,5 +191,134 @@ func lookupJoinBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) fds.AddFrom(&filterProps.FuncDeps) } - return remapProvided(childProvided, &fds, lookupJoin.Cols) + return remapProvided(provided, &fds, lookupJoin.Cols) +} + +// Lookup joins can maintain the index ordering on each lookup, in order to +// return results in index order for each input row. getLookupOrdCols checks +// whether it is possible to append index columns to the input ordering in order +// to satisfy the required ordering. If it is possible, getLookupOrdCols returns +// the index ordering columns that should be appended to the input ordering and +// ok is true. Otherwise, ok is false. +// +// It is possible for a lookup join to supply an ordering that references index +// columns if ordering consists of a series of input columns that form a key +// over the input, followed by the index columns in index order. Due to +// implementation details, currently the ordering columns from the index must be +// ASC. The following is a case where a lookup join could maintain an ordering +// over both input and index columns: +// CREATE TABLE ab (a INT, b INT, PRIMARY KEY(a, b)); +// CREATE TABLE xyz (x INT, y INT, z INT, PRIMARY KEY(x, y, z DESC)); +// SELECT * FROM ab INNER LOOKUP JOIN xy ON a = x ORDER BY a, b, x, y; +// Note that in this example the 'a' and 'b' columns form a key over the +// input of the lookup join. Additionally, the 'x' column alone is not a key +// for the 'xy' table, so each lookup may return multiple rows (which need +// to be ordered among themselves). Since the postfix of the ordering that +// references index columns is in index order (x, y) and has no DESC +// columns, the lookup join in the example can supply the ordering itself. +// On the other hand, switching 'b' and 'y' in the ordering, removing 'b', +// or adding the 'z' column to the required order would mean the query would +// require a sort. +// +// Note that the Columns field of the required OrderingChoice should reflect the +// postfix of the required ordering that cannot be satisfied by input columns, +// rather than the entire required ordering. The inputOrderingColumns argument +// is the set of columns referenced by the input ordering. +func getLookupOrdCols( + lookupJoin *memo.LookupJoinExpr, required *props.OrderingChoice, inputOrderingCols opt.ColSet, +) (lookupOrdering opt.Ordering, ok bool) { + if !lookupJoin.Input.Relational().FuncDeps.ColsAreStrictKey(inputOrderingCols) { + // Ensure that the ordering forms a key over the input columns. Lookup + // joins can only maintain the index ordering for each individual input + // row, so we need to disallow cases where different input rows may sort + // the same on the input ordering. + // TODO(drewk): it is possible to take advantage of the index ordering + // when the input ordering does not form a key over the input. In this + // case, we would require that the index ordering columns for a given + // input row are functionally determined by the input ordering columns. + // This would disqualify IN constraints and inequalities. + return nil, false + } + // The columns from the prefix of the required ordering satisfied by the + // input are considered optional for the index ordering. + optionalCols := required.Optional.Union(inputOrderingCols) + + // Build an ordering that represents the index. + joinFDs := &lookupJoin.Relational().FuncDeps + idx := lookupJoin.Memo().Metadata().Table(lookupJoin.Table).Index(lookupJoin.Index) + indexOrder := make(opt.Ordering, 0, idx.KeyColumnCount()) + requiredCols := required.ColSet() + for i := 0; i < idx.KeyColumnCount(); i++ { + idxColID := lookupJoin.Table.ColumnID(idx.Column(i).Ordinal()) + if optionalCols.Contains(idxColID) { + // Don't try to include optional columns. + continue + } + if i < len(lookupJoin.KeyCols) && optionalCols.Contains(lookupJoin.KeyCols[i]) { + // The index column is equivalent to an optional column. Add both to the + // optional columns set and compute its closure. + optionalCols.Add(lookupJoin.KeyCols[i]) + optionalCols.Add(idxColID) + optionalCols = joinFDs.ComputeClosure(optionalCols) + continue + } + if !requiredCols.Contains(idxColID) { + // This index column is not part of the required ordering. It is possible + // that the prefix of the index ordering we have reached so far can + // satisfy the required ordering, so break instead of returning. + break + } + if idx.Column(i).Descending { + // The index ordering columns must be ASC in order for lookups to be + // returned in index order. + return nil, false + } + indexOrder = append(indexOrder, opt.MakeOrderingColumn(idxColID, idx.Column(i).Descending)) + } + // Check if the index ordering satisfies the postfix of the required + // ordering that cannot be satisfied by the input. + indexOrder, remaining := trySatisfyRequired(required, indexOrder) + return indexOrder, remaining == nil +} + +// trySatisfyRequired returns a prefix of the given provided Ordering that is +// compatible with the required ordering (e.g. all columns either line up with +// the required columns or are optional), as well as an OrderingChoice that +// indicates how the prefix needs to be extended in order to imply the required +// OrderingChoice. If the prefix satisfies the required props, toExtend will be +// nil. The returned fields reference the slices of the inputs, so they are not +// safe to mutate. +func trySatisfyRequired( + required *props.OrderingChoice, provided opt.Ordering, +) (prefix opt.Ordering, toExtend *props.OrderingChoice) { + var requiredIdx, providedIdx int + for requiredIdx < len(required.Columns) && providedIdx < len(provided) { + requiredCol, providedCol := required.Columns[requiredIdx], provided[providedIdx] + if required.Optional.Contains(providedCol.ID()) { + // Skip optional columns. + providedIdx++ + continue + } + if !requiredCol.Group.Contains(providedCol.ID()) || + requiredCol.Descending != providedCol.Descending() { + // The provided ordering columns must either line up with the + // OrderingChoice columns, or be part of the optional column set. + // Additionally, the provided ordering must have the same column + // directions as the OrderingChoice. + break + } + // The current OrderingChoice and provided columns match up. + requiredIdx++ + providedIdx++ + } + if providedIdx > 0 { + prefix = provided[:providedIdx] + } + if requiredIdx < len(required.Columns) { + toExtend = &props.OrderingChoice{ + Optional: required.Optional, + Columns: required.Columns[requiredIdx:], + } + } + return prefix, toExtend } diff --git a/pkg/sql/opt/ordering/lookup_join_test.go b/pkg/sql/opt/ordering/lookup_join_test.go index 0e711e5ed7f7..5567aedc41b4 100644 --- a/pkg/sql/opt/ordering/lookup_join_test.go +++ b/pkg/sql/opt/ordering/lookup_join_test.go @@ -29,9 +29,13 @@ import ( func TestLookupJoinProvided(t *testing.T) { tc := testcat.New() - if _, err := tc.ExecuteDDL( - "CREATE TABLE t (c1 INT, c2 INT, c3 INT, c4 INT, PRIMARY KEY(c1, c2))", - ); err != nil { + if _, err := tc.ExecuteDDL(` + CREATE TABLE t ( + c1 INT, c2 INT, c3 INT, c4 INT, + PRIMARY KEY(c1, c2), + INDEX desc_idx(c1, c2 DESC) STORING(c3, c4) + ) + `); err != nil { t.Fatal(err) } st := cluster.MakeTestingClusterSettings() @@ -46,58 +50,114 @@ func TestLookupJoinProvided(t *testing.T) { t.Fatalf("unexpected ID for column c1: %d\n", c1) } + const descendingIndex = 1 + idxName := md.Table(tab).Index(descendingIndex).Name() + if idxName != "desc_idx" { + t.Fatalf("unexpected index: %s, expected desc_idx", idxName) + } + c := func(cols ...opt.ColumnID) opt.ColSet { return opt.MakeColSet(cols...) } testCases := []struct { + index cat.IndexOrdinal keyCols opt.ColList + inputKey opt.ColSet outCols opt.ColSet required string input string provided string }{ // In these tests, the input (left side of the join) has columns 5,6 and the - // table (right side) has columns 1,2,3,4 and the join has condition - // (c5, c6) = (c1, c2). + // table (right side) has columns 1,2,3,4. // { // case 1: the lookup join adds columns 3,4 from the table and retains the - // input columns. + // input columns. Joining on (c1, c2) = (c5, c6). keyCols: opt.ColList{5, 6}, - outCols: c(3, 4, 5, 6), + inputKey: c(5, 6), + outCols: c(3, 4, 5, 6, 7, 8), required: "+5,+6", input: "+5,+6", provided: "+5,+6", }, { // case 2: the lookup join produces all columns. The provided ordering - // on 5,6 is equivalent to an ordering on 1,2. + // on 5,6 is equivalent to an ordering on 1,2. Joining on (c1, c2) = + // (c5, c6). keyCols: opt.ColList{5, 6}, - outCols: c(1, 2, 3, 4, 5, 6), - required: "-1,+2", + inputKey: c(5, 6), + outCols: c(1, 2, 3, 4, 5, 6, 7, 8), + required: "-(1|5),+(2|6)", input: "-5,+6", provided: "-5,+6", }, { // case 3: the lookup join does not produce input columns 5,6; we must // remap the input ordering to refer to output columns 1,2 instead. + // Joining on (c1, c2) = (c5, c6). keyCols: opt.ColList{5, 6}, + inputKey: c(5, 6), outCols: c(1, 2, 3, 4), - required: "+1,-2", + required: "+(1|5),-(2|6)", input: "+5,-6", provided: "+1,-2", }, { // case 4: a hybrid of the two cases above (we need to remap column 6). + // Joining on (c1, c2) = (c5, c6). keyCols: opt.ColList{5, 6}, + inputKey: c(5, 6), outCols: c(1, 2, 3, 4, 5), - required: "-1,-2", + required: "-(1|5),-(2|6)", input: "-5,-6", provided: "-5,-2", }, + { // case 5: the lookup join adds column c2 as an ordering column. Joining + // on c1 = c5. + keyCols: opt.ColList{5}, + inputKey: c(5, 6), + outCols: c(2, 3, 4, 5, 6), + required: "+(1|5),+6,+2", + input: "+5,+6", + provided: "+5,+6,+2", + }, + { // case 6: the lookup join outputs all columns and adds column c2 as an + // ordering column. Joining on c1 = c6. + keyCols: opt.ColList{6}, + inputKey: c(6), + outCols: c(1, 2, 3, 4, 5, 6), + required: "-5,+(1|6),+2", + input: "-5,+6", + provided: "-5,+6,+2", + }, + { // case 7: the lookup join does not produce input columns 5,6; we must + // remap the input ordering to refer to output column 1 instead. + keyCols: opt.ColList{5}, + inputKey: c(5), + outCols: c(1, 2, 3, 4), + required: "+(1|5),+2", + input: "+5", + provided: "+1,+2", + }, + { // case 8: the lookup join preserves the input ordering but cannot provide + // the entire required ordering because the index has a descending column. + index: descendingIndex, + keyCols: opt.ColList{5}, + inputKey: c(5, 6), + outCols: c(2, 3, 4, 5, 6), + required: "+(1|5),+6,-2", + input: "+5,+6", + provided: "+5,+6", + }, } for tcIdx, tc := range testCases { t.Run(fmt.Sprintf("case%d", tcIdx+1), func(t *testing.T) { + inputFDs := props.FuncDepSet{} + inputFDs.AddStrictKey(tc.inputKey, c(5, 6)) input := &testexpr.Instance{ - Rel: &props.Relational{}, + Rel: &props.Relational{ + OutputCols: c(5, 6), + FuncDeps: inputFDs, + }, Provided: &physical.Provided{ Ordering: props.ParseOrdering(tc.input), }, @@ -108,7 +168,7 @@ func TestLookupJoinProvided(t *testing.T) { &memo.LookupJoinPrivate{ JoinType: opt.InnerJoinOp, Table: tab, - Index: cat.PrimaryIndex, + Index: tc.index, KeyCols: tc.keyCols, Cols: tc.outCols, }, @@ -121,3 +181,271 @@ func TestLookupJoinProvided(t *testing.T) { }) } } + +func TestLookupJoinCanProvide(t *testing.T) { + tc := testcat.New() + if _, err := tc.ExecuteDDL(` + CREATE TABLE t ( + c1 INT, c2 INT, c3 INT, c4 INT, + PRIMARY KEY(c1, c2), + INDEX sec_idx(c3, c4, c1, c2), + INDEX desc_idx(c1, c2 DESC) STORING(c3, c4) + ) + `); err != nil { + t.Fatal(err) + } + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.NewTestingEvalContext(st) + var f norm.Factory + f.Init(evalCtx, tc) + md := f.Metadata() + tn := tree.NewUnqualifiedTableName("t") + tab := md.AddTable(tc.Table(tn), tn) + + if c1 := tab.ColumnID(0); c1 != 1 { + t.Fatalf("unexpected ID for column c1: %d\n", c1) + } + + const secondaryIndex, descendingIndex = 1, 2 + idxName := md.Table(tab).Index(secondaryIndex).Name() + if idxName != "sec_idx" { + t.Fatalf("unexpected index: %s, expected sec_idx", idxName) + } + idxName = md.Table(tab).Index(descendingIndex).Name() + if idxName != "desc_idx" { + t.Fatalf("unexpected index: %s, expected desc_idx", idxName) + } + + c := opt.MakeColSet + + errorString := func(canProvide bool) string { + if canProvide { + return "expected to be able to provide %s" + } + return "expected not to be able to provide %s" + } + + testCases := []struct { + idx cat.IndexOrdinal + keyCols opt.ColList + outCols opt.ColSet + inputKey opt.ColSet + required string + canProvide bool + }{ + // In these tests, the input (left side of the join) has columns 5,6 and the + // table (right side) has columns 1,2,3,4. + // + { // Case 1: the ordering can project input columns. + keyCols: opt.ColList{5}, + outCols: c(1, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+6", + canProvide: true, + }, + { // Case 2: the ordering can project input columns. + keyCols: opt.ColList{5, 6}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+(2|6)", + canProvide: true, + }, + { // Case 3: the ordering cannot project only input columns, but the lookup + // can maintain the ordering on both input and lookup columns. + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+6,+2", + canProvide: true, + }, + { // Case 4: the ordering cannot project only input columns, but the lookup + // can maintain the ordering on both input and lookup columns. + idx: secondaryIndex, + keyCols: opt.ColList{5}, + outCols: c(1, 2, 3, 4, 5, 6), + inputKey: c(5), + required: "+(3|5),+4", + canProvide: true, + }, + { // Case 5: the ordering cannot project only input columns, but the lookup + // can maintain the ordering on both input and lookup columns. + idx: secondaryIndex, + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(3|5),+6,+4,+2 opt(1)", + canProvide: true, + }, + { // Case 6: the ordering cannot be satisfied because the input and lookup + // ordering columns are interleaved. + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+2,+6", + canProvide: false, + }, + { // Case 7: the ordering cannot be satisfied because the input ordering + // columns do not form a key over the input. + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(6), + required: "+(1|5),+2", + canProvide: false, + }, + { // Case 8: the ordering cannot be satisfied because the lookup ordering + // involves columns that are not part of the index. + keyCols: opt.ColList{5, 6}, + outCols: c(1, 3, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+6,+3", + canProvide: false, + }, + { // Case 9: the ordering cannot be satisfied because the lookup ordering + // columns are not in index order. + idx: secondaryIndex, + keyCols: opt.ColList{5}, + outCols: c(1, 2, 3, 4, 5, 6), + inputKey: c(5), + required: "+(3|5),+1,+4", + canProvide: false, + }, + { // Case 10: the ordering cannot be satisfied because one of the lookup + // ordering columns is sorted in the wrong direction. + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+6,-2", + canProvide: false, + }, + { // Case 11: the ordering cannot be satisfied because the lookup index has + // a descending column. + idx: descendingIndex, + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(1|5),+6,-2", + canProvide: false, + }, + { // Case 12: the ordering cannot be satisfied because the required ordering + // is missing index column c1. + idx: secondaryIndex, + keyCols: opt.ColList{5}, + outCols: c(1, 2, 5, 6), + inputKey: c(5, 6), + required: "+(3|5),+6,+4,+2", + canProvide: false, + }, + } + + for tcIdx, tc := range testCases { + t.Run(fmt.Sprintf("case%d", tcIdx+1), func(t *testing.T) { + fds := props.FuncDepSet{} + fds.AddStrictKey(tc.inputKey, c(5, 6)) + input := &testexpr.Instance{ + Rel: &props.Relational{ + OutputCols: c(5, 6), + FuncDeps: fds, + }, + } + lookupJoin := f.Memo().MemoizeLookupJoin( + input, + nil, /* FiltersExpr */ + &memo.LookupJoinPrivate{ + JoinType: opt.InnerJoinOp, + Table: tab, + Index: tc.idx, + KeyCols: tc.keyCols, + Cols: tc.outCols, + }, + ) + req := props.ParseOrderingChoice(tc.required) + canProvide := lookupJoinCanProvideOrdering(lookupJoin, &req) + if canProvide != tc.canProvide { + t.Errorf(errorString(tc.canProvide), req) + } + }) + } +} + +func TestTrySatisfyRequired(t *testing.T) { + testCases := []struct { + required string + provided string + prefix string + toExtend string + }{ + { // Case 1: required ordering is prefix of provided. + required: "+1,+2,+3", + provided: "+1,+2,+3,+4", + prefix: "+1,+2,+3", + toExtend: "", + }, + { // Case 2: required ordering is empty. + required: "", + provided: "+1,-2", + prefix: "", + toExtend: "", + }, + { // Case 3: provided ordering includes optional columns. + required: "+1,+2,+3 opt(4,5)", + provided: "+1,-4,+2,+5,+3", + prefix: "+1,-4,+2,+5,+3", + toExtend: "", + }, + { // Case 4: required ordering includes equivalent columns. + required: "+(1|4),-(2|5),+3", + provided: "+1,-2,+3", + prefix: "+1,-2,+3", + toExtend: "", + }, + { // Case 5: provided ordering is prefix of required. + required: "+1,+2,+3", + provided: "+1,+2", + prefix: "+1,+2", + toExtend: "+3", + }, + { // Case 6: provided ordering has non-optional columns between required + // columns. + required: "+1,+2,+3", + provided: "+1,+2,+4,+3", + prefix: "+1,+2", + toExtend: "+3", + }, + { // Case 7: provided ordering column is in the wrong direction. + required: "+1,+2,+3", + provided: "+1,-2,+3", + prefix: "+1", + toExtend: "+2,+3", + }, + { // Case 8: provided ordering is empty and required is non-empty. + required: "+1", + provided: "", + prefix: "", + toExtend: "+1", + }, + } + + expect := func(exp, got string) { + t.Helper() + if got != exp { + t.Errorf("expected %s; got %s", exp, got) + } + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("case%d", i+1), func(t *testing.T) { + required := props.ParseOrderingChoice(tc.required) + provided := props.ParseOrdering(tc.provided) + prefix, toExtend := trySatisfyRequired(&required, provided) + prefixString, toExtendString := "", "" + if prefix != nil { + prefixString = prefix.String() + } + if toExtend != nil { + toExtendString = toExtend.String() + } + expect(tc.prefix, prefixString) + expect(tc.toExtend, toExtendString) + }) + } +} diff --git a/pkg/sql/opt/ordering/ordering.go b/pkg/sql/opt/ordering/ordering.go index 8ff5666fec5f..2795bff98caf 100644 --- a/pkg/sql/opt/ordering/ordering.go +++ b/pkg/sql/opt/ordering/ordering.go @@ -140,12 +140,12 @@ func init() { buildProvidedOrdering: setOpBuildProvided, } funcMap[opt.IndexJoinOp] = funcs{ - canProvideOrdering: lookupOrIndexJoinCanProvideOrdering, + canProvideOrdering: indexJoinCanProvideOrdering, buildChildReqOrdering: lookupOrIndexJoinBuildChildReqOrdering, buildProvidedOrdering: indexJoinBuildProvided, } funcMap[opt.LookupJoinOp] = funcs{ - canProvideOrdering: lookupOrIndexJoinCanProvideOrdering, + canProvideOrdering: lookupJoinCanProvideOrdering, buildChildReqOrdering: lookupOrIndexJoinBuildChildReqOrdering, buildProvidedOrdering: lookupJoinBuildProvided, } diff --git a/pkg/sql/opt/xform/testdata/external/tpce b/pkg/sql/opt/xform/testdata/external/tpce index eaa01b702316..664e2415d030 100644 --- a/pkg/sql/opt/xform/testdata/external/tpce +++ b/pkg/sql/opt/xform/testdata/external/tpce @@ -4024,9 +4024,10 @@ project │ ├── cardinality: [0 - 1] │ ├── key: () │ ├── fd: ()-->(1,8,27,30,31,45-50), (47)==(31), (8)==(45), (45)==(8), (31)==(47) - │ ├── inner-join (hash) + │ ├── inner-join (merge) │ │ ├── columns: c_id:1!null c_tier:8!null s_symb:27!null s_name:30!null s_ex_id:31!null cr_c_tier:45!null cr_tt_id:46!null cr_ex_id:47!null cr_from_qty:48!null cr_to_qty:49!null commission_rate.cr_rate:50!null - │ │ ├── multiplicity: left-rows(zero-or-one), right-rows(zero-or-more) + │ │ ├── left ordering: +45 + │ │ ├── right ordering: +8 │ │ ├── key: (48) │ │ ├── fd: ()-->(1,8,27,30,31,45-47), (48)-->(49,50), (31)==(47), (47)==(31), (8)==(45), (45)==(8) │ │ ├── limit hint: 1.00 @@ -4040,6 +4041,7 @@ project │ │ │ │ └── cr_from_qty:48 <= 100 [outer=(48), constraints=(/48: (/NULL - /100]; tight)] │ │ │ ├── key: (45,48) │ │ │ ├── fd: ()-->(27,30,31,46,47), (45,48)-->(49,50), (31)==(47), (47)==(31) + │ │ │ ├── ordering: +45 opt(27,30,31,46,47) [actual: +45] │ │ │ ├── project │ │ │ │ ├── columns: "lookup_join_const_col_@46":54!null s_symb:27!null s_name:30!null s_ex_id:31!null │ │ │ │ ├── cardinality: [0 - 1] @@ -4061,8 +4063,7 @@ project │ │ │ ├── cardinality: [0 - 1] │ │ │ ├── key: () │ │ │ └── fd: ()-->(1,8) - │ │ └── filters - │ │ └── cr_c_tier:45 = c_tier:8 [outer=(8,45), constraints=(/8: (/NULL - ]; /45: (/NULL - ]), fd=(8)==(45), (45)==(8)] + │ │ └── filters (true) │ └── 1 └── projections └── commission_rate.cr_rate:50::FLOAT8 [as=cr_rate:53, outer=(50), immutable] diff --git a/pkg/sql/opt/xform/testdata/external/tpce-no-stats b/pkg/sql/opt/xform/testdata/external/tpce-no-stats index ad0fae5d1bdc..1c353bb3715b 100644 --- a/pkg/sql/opt/xform/testdata/external/tpce-no-stats +++ b/pkg/sql/opt/xform/testdata/external/tpce-no-stats @@ -4055,9 +4055,10 @@ project │ ├── cardinality: [0 - 1] │ ├── key: () │ ├── fd: ()-->(1,8,27,30,31,45-50), (47)==(31), (8)==(45), (45)==(8), (31)==(47) - │ ├── inner-join (hash) + │ ├── inner-join (merge) │ │ ├── columns: c_id:1!null c_tier:8!null s_symb:27!null s_name:30!null s_ex_id:31!null cr_c_tier:45!null cr_tt_id:46!null cr_ex_id:47!null cr_from_qty:48!null cr_to_qty:49!null commission_rate.cr_rate:50!null - │ │ ├── multiplicity: left-rows(zero-or-one), right-rows(zero-or-more) + │ │ ├── left ordering: +47 + │ │ ├── right ordering: +31 │ │ ├── key: (48) │ │ ├── fd: ()-->(1,8,27,30,31,45-47), (48)-->(49,50), (31)==(47), (47)==(31), (8)==(45), (45)==(8) │ │ ├── limit hint: 1.00 @@ -4066,6 +4067,7 @@ project │ │ │ ├── key columns: [8 56] = [45 46] │ │ │ ├── key: (47,48) │ │ │ ├── fd: ()-->(1,8,45,46), (47,48)-->(49,50), (8)==(45), (45)==(8) + │ │ │ ├── ordering: +47 opt(1,8,45,46) [actual: +47] │ │ │ ├── project │ │ │ │ ├── columns: "lookup_join_const_col_@46":56!null c_id:1!null c_tier:8!null │ │ │ │ ├── cardinality: [0 - 1] @@ -4088,8 +4090,7 @@ project │ │ │ ├── cardinality: [0 - 1] │ │ │ ├── key: () │ │ │ └── fd: ()-->(27,30,31) - │ │ └── filters - │ │ └── cr_ex_id:47 = s_ex_id:31 [outer=(31,47), constraints=(/31: (/NULL - ]; /47: (/NULL - ]), fd=(31)==(47), (47)==(31)] + │ │ └── filters (true) │ └── 1 └── projections └── commission_rate.cr_rate:50::FLOAT8 [as=cr_rate:53, outer=(50), immutable] @@ -5370,27 +5371,23 @@ project │ │ │ │ ├── fd: ()-->(6,24,27), (1)-->(2,4,5,7,9-11), (18)-->(19), (4)==(18), (18)==(4), (6)==(24), (24)==(6) │ │ │ │ ├── ordering: +2 opt(6,24,27) [actual: +2] │ │ │ │ ├── limit hint: 50.00 - │ │ │ │ ├── sort + │ │ │ │ ├── inner-join (lookup trade@trade_t_s_symb_t_dts_idx) │ │ │ │ │ ├── columns: t_id:1!null t_dts:2!null t_tt_id:4!null t_is_cash:5!null t_s_symb:6!null t_qty:7!null t_ca_id:9!null t_exec_name:10!null trade.t_trade_price:11 s_symb:24!null s_name:27!null + │ │ │ │ │ ├── lookup expression + │ │ │ │ │ │ └── filters + │ │ │ │ │ │ ├── s_symb:24 = t_s_symb:6 [outer=(6,24), constraints=(/6: (/NULL - ]; /24: (/NULL - ]), fd=(6)==(24), (24)==(6)] + │ │ │ │ │ │ └── (t_dts:2 >= '2020-06-15 22:27:42.148484') AND (t_dts:2 <= '2020-06-20 22:27:42.148484') [outer=(2), constraints=(/2: [/'2020-06-15 22:27:42.148484' - /'2020-06-20 22:27:42.148484']; tight)] │ │ │ │ │ ├── key: (1) │ │ │ │ │ ├── fd: ()-->(6,24,27), (1)-->(2,4,5,7,9-11), (6)==(24), (24)==(6) │ │ │ │ │ ├── ordering: +2 opt(6,24,27) [actual: +2] - │ │ │ │ │ └── inner-join (lookup trade@trade_t_s_symb_t_dts_idx) - │ │ │ │ │ ├── columns: t_id:1!null t_dts:2!null t_tt_id:4!null t_is_cash:5!null t_s_symb:6!null t_qty:7!null t_ca_id:9!null t_exec_name:10!null trade.t_trade_price:11 s_symb:24!null s_name:27!null - │ │ │ │ │ ├── lookup expression - │ │ │ │ │ │ └── filters - │ │ │ │ │ │ ├── s_symb:24 = t_s_symb:6 [outer=(6,24), constraints=(/6: (/NULL - ]; /24: (/NULL - ]), fd=(6)==(24), (24)==(6)] - │ │ │ │ │ │ └── (t_dts:2 >= '2020-06-15 22:27:42.148484') AND (t_dts:2 <= '2020-06-20 22:27:42.148484') [outer=(2), constraints=(/2: [/'2020-06-15 22:27:42.148484' - /'2020-06-20 22:27:42.148484']; tight)] - │ │ │ │ │ ├── key: (1) - │ │ │ │ │ ├── fd: ()-->(6,24,27), (1)-->(2,4,5,7,9-11), (6)==(24), (24)==(6) - │ │ │ │ │ ├── scan security - │ │ │ │ │ │ ├── columns: s_symb:24!null s_name:27!null - │ │ │ │ │ │ ├── constraint: /24: [/'ROACH' - /'ROACH'] - │ │ │ │ │ │ ├── cardinality: [0 - 1] - │ │ │ │ │ │ ├── key: () - │ │ │ │ │ │ └── fd: ()-->(24,27) - │ │ │ │ │ └── filters - │ │ │ │ │ └── t_s_symb:6 = 'ROACH' [outer=(6), constraints=(/6: [/'ROACH' - /'ROACH']; tight), fd=()-->(6)] + │ │ │ │ │ ├── scan security + │ │ │ │ │ │ ├── columns: s_symb:24!null s_name:27!null + │ │ │ │ │ │ ├── constraint: /24: [/'ROACH' - /'ROACH'] + │ │ │ │ │ │ ├── cardinality: [0 - 1] + │ │ │ │ │ │ ├── key: () + │ │ │ │ │ │ └── fd: ()-->(24,27) + │ │ │ │ │ └── filters + │ │ │ │ │ └── t_s_symb:6 = 'ROACH' [outer=(6), constraints=(/6: [/'ROACH' - /'ROACH']; tight), fd=()-->(6)] │ │ │ │ └── filters (true) │ │ │ └── 50 │ │ └── filters (true) diff --git a/pkg/sql/opt/xform/testdata/external/tpch-no-stats b/pkg/sql/opt/xform/testdata/external/tpch-no-stats index 6e61dc648487..57ccf606336b 100644 --- a/pkg/sql/opt/xform/testdata/external/tpch-no-stats +++ b/pkg/sql/opt/xform/testdata/external/tpch-no-stats @@ -160,9 +160,10 @@ project ├── columns: p_partkey:1!null p_mfgr:3!null s_name:13!null s_address:14!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_name:29!null min:66!null ├── key: (21,22) ├── fd: (1)-->(3), (21,22)-->(1,3,13,14,16-18,24,29,66), (1)==(21), (21)==(1), (22)-->(13,14,16-18,29), (24)==(66), (66)==(24) - ├── group-by (hash) + ├── group-by (streaming) │ ├── columns: p_partkey:1!null p_mfgr:3!null s_name:13!null s_address:14!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_name:29!null min:66!null │ ├── grouping columns: ps_partkey:21!null ps_suppkey:22!null + │ ├── internal-ordering: +(1|21|39),+(12|22) opt(6,35,62) │ ├── key: (21,22) │ ├── fd: (1)-->(3), (21,22)-->(1,3,13,14,16-18,24,29,66), (1)==(21), (21)==(1), (22)-->(13,14,16-18,29) │ ├── inner-join (lookup region) @@ -171,54 +172,64 @@ project │ │ ├── lookup columns are key │ │ ├── key: (22,39,46) │ │ ├── fd: ()-->(6,35,62), (1)-->(3,5), (12)-->(13-18), (21,22)-->(24), (12)==(22), (22)==(12), (28)-->(29,30), (30)==(34), (34)==(30), (15)==(28), (28)==(15), (1)==(21,39), (21)==(1,39), (39,40)-->(42), (46)-->(49), (55)-->(57), (57)==(61), (61)==(57), (49)==(55), (55)==(49), (40)==(46), (46)==(40), (39)==(1,21) + │ │ ├── ordering: +(1|21|39),+(12|22) opt(6,35,62) [actual: +1,+22] │ │ ├── inner-join (lookup nation) │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_nationkey:28!null n_name:29!null n_regionkey:30!null r_regionkey:34!null r_name:35!null ps_partkey:39!null ps_suppkey:40!null ps_supplycost:42!null s_suppkey:46!null s_nationkey:49!null n_nationkey:55!null n_regionkey:57!null │ │ │ ├── key columns: [49] = [55] │ │ │ ├── lookup columns are key │ │ │ ├── key: (22,39,46) │ │ │ ├── fd: ()-->(6,35), (1)-->(3,5), (28)-->(29,30), (12)-->(13-18), (21,22)-->(24), (39,40)-->(42), (46)-->(49), (55)-->(57), (49)==(55), (55)==(49), (40)==(46), (46)==(40), (21)==(1,39), (39)==(1,21), (12)==(22), (22)==(12), (15)==(28), (28)==(15), (30)==(34), (34)==(30), (1)==(21,39) + │ │ │ ├── ordering: +(1|21|39),+(12|22) opt(6,35) [actual: +1,+22] │ │ │ ├── inner-join (lookup supplier) │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_nationkey:28!null n_name:29!null n_regionkey:30!null r_regionkey:34!null r_name:35!null ps_partkey:39!null ps_suppkey:40!null ps_supplycost:42!null s_suppkey:46!null s_nationkey:49!null │ │ │ │ ├── key columns: [40] = [46] │ │ │ │ ├── lookup columns are key │ │ │ │ ├── key: (22,39,46) │ │ │ │ ├── fd: ()-->(6,35), (1)-->(3,5), (28)-->(29,30), (12)-->(13-18), (21,22)-->(24), (39,40)-->(42), (46)-->(49), (40)==(46), (46)==(40), (21)==(1,39), (39)==(1,21), (12)==(22), (22)==(12), (15)==(28), (28)==(15), (30)==(34), (34)==(30), (1)==(21,39) + │ │ │ │ ├── ordering: +(1|21|39),+(12|22) opt(6,35) [actual: +1,+22] │ │ │ │ ├── inner-join (lookup partsupp) │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_nationkey:28!null n_name:29!null n_regionkey:30!null r_regionkey:34!null r_name:35!null ps_partkey:39!null ps_suppkey:40!null ps_supplycost:42!null │ │ │ │ │ ├── key columns: [1] = [39] │ │ │ │ │ ├── key: (22,39,40) │ │ │ │ │ ├── fd: ()-->(6,35), (1)-->(3,5), (28)-->(29,30), (12)-->(13-18), (21,22)-->(24), (39,40)-->(42), (21)==(1,39), (39)==(1,21), (12)==(22), (22)==(12), (15)==(28), (28)==(15), (30)==(34), (34)==(30), (1)==(21,39) + │ │ │ │ │ ├── ordering: +(1|21|39),+(12|22) opt(6,35) [actual: +1,+22] │ │ │ │ │ ├── inner-join (lookup region) │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_nationkey:28!null n_name:29!null n_regionkey:30!null r_regionkey:34!null r_name:35!null │ │ │ │ │ │ ├── key columns: [30] = [34] │ │ │ │ │ │ ├── lookup columns are key │ │ │ │ │ │ ├── key: (21,22) │ │ │ │ │ │ ├── fd: ()-->(6,35), (1)-->(3,5), (12)-->(13-18), (21,22)-->(24), (12)==(22), (22)==(12), (28)-->(29,30), (30)==(34), (34)==(30), (15)==(28), (28)==(15), (1)==(21), (21)==(1) + │ │ │ │ │ │ ├── ordering: +(1|21),+(12|22) opt(6,35) [actual: +1,+22] │ │ │ │ │ │ ├── inner-join (lookup nation) │ │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null n_nationkey:28!null n_name:29!null n_regionkey:30!null │ │ │ │ │ │ │ ├── key columns: [15] = [28] │ │ │ │ │ │ │ ├── lookup columns are key │ │ │ │ │ │ │ ├── key: (21,22) │ │ │ │ │ │ │ ├── fd: ()-->(6), (1)-->(3,5), (21,22)-->(24), (12)-->(13-18), (28)-->(29,30), (15)==(28), (28)==(15), (12)==(22), (22)==(12), (1)==(21), (21)==(1) + │ │ │ │ │ │ │ ├── ordering: +(1|21),+(12|22) opt(6) [actual: +1,+22] │ │ │ │ │ │ │ ├── inner-join (lookup supplier) │ │ │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null s_suppkey:12!null s_name:13!null s_address:14!null s_nationkey:15!null s_phone:16!null s_acctbal:17!null s_comment:18!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null │ │ │ │ │ │ │ │ ├── key columns: [22] = [12] │ │ │ │ │ │ │ │ ├── lookup columns are key │ │ │ │ │ │ │ │ ├── key: (21,22) │ │ │ │ │ │ │ │ ├── fd: ()-->(6), (1)-->(3,5), (12)-->(13-18), (21,22)-->(24), (12)==(22), (22)==(12), (1)==(21), (21)==(1) + │ │ │ │ │ │ │ │ ├── ordering: +(1|21),+(12|22) opt(6) [actual: +1,+22] │ │ │ │ │ │ │ │ ├── inner-join (lookup partsupp) │ │ │ │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null ps_partkey:21!null ps_suppkey:22!null ps_supplycost:24!null │ │ │ │ │ │ │ │ │ ├── key columns: [1] = [21] │ │ │ │ │ │ │ │ │ ├── key: (21,22) │ │ │ │ │ │ │ │ │ ├── fd: ()-->(6), (1)-->(3,5), (21,22)-->(24), (1)==(21), (21)==(1) + │ │ │ │ │ │ │ │ │ ├── ordering: +(1|21),+22 opt(6) [actual: +1,+22] │ │ │ │ │ │ │ │ │ ├── select │ │ │ │ │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null │ │ │ │ │ │ │ │ │ │ ├── key: (1) │ │ │ │ │ │ │ │ │ │ ├── fd: ()-->(6), (1)-->(3,5) + │ │ │ │ │ │ │ │ │ │ ├── ordering: +1 opt(6) [actual: +1] │ │ │ │ │ │ │ │ │ │ ├── scan part │ │ │ │ │ │ │ │ │ │ │ ├── columns: p_partkey:1!null p_mfgr:3!null p_type:5!null p_size:6!null │ │ │ │ │ │ │ │ │ │ │ ├── key: (1) - │ │ │ │ │ │ │ │ │ │ │ └── fd: (1)-->(3,5,6) + │ │ │ │ │ │ │ │ │ │ │ ├── fd: (1)-->(3,5,6) + │ │ │ │ │ │ │ │ │ │ │ └── ordering: +1 │ │ │ │ │ │ │ │ │ │ └── filters │ │ │ │ │ │ │ │ │ │ ├── p_size:6 = 15 [outer=(6), constraints=(/6: [/15 - /15]; tight), fd=()-->(6)] │ │ │ │ │ │ │ │ │ │ └── p_type:5 LIKE '%BRASS' [outer=(5), constraints=(/5: (/NULL - ])] diff --git a/pkg/sql/opt/xform/testdata/physprops/ordering b/pkg/sql/opt/xform/testdata/physprops/ordering index 180e217d1562..460b8e05cb2d 100644 --- a/pkg/sql/opt/xform/testdata/physprops/ordering +++ b/pkg/sql/opt/xform/testdata/physprops/ordering @@ -10,7 +10,7 @@ CREATE TABLE a ---- exec-ddl -CREATE TABLE abc (a INT, b INT, c INT, PRIMARY KEY (a, b, c)) +CREATE TABLE abc (a INT, b INT, c INT, PRIMARY KEY (a, b, c), INDEX abc_desc(a, b, c DESC)) ---- exec-ddl @@ -2599,7 +2599,7 @@ project ├── immutable ├── key: (7) ├── fd: (7)-->(9,11) - ├── ordering: +9,+7 [actual: +9] + ├── ordering: +9,+7 └── limit ├── columns: k:1!null name:2!null x:3!null k:7!null name:8!null x:9!null crdb_internal_mvcc_timestamp:11 ├── internal-ordering: +(3|9),+(1|7) @@ -2607,7 +2607,7 @@ project ├── immutable ├── key: (7) ├── fd: (1)-->(2,3), (7)-->(8,9,11), (2)==(8), (8)==(2), (1)==(7), (7)==(1), (3)==(9), (9)==(3) - ├── ordering: +(3|9),+(1|7) [actual: +3] + ├── ordering: +(3|9),+(1|7) [actual: +3,+7] ├── inner-join (lookup t73968) │ ├── columns: k:1!null name:2!null x:3!null k:7!null name:8!null x:9!null crdb_internal_mvcc_timestamp:11 │ ├── key columns: [7] = [7] @@ -2615,7 +2615,7 @@ project │ ├── immutable │ ├── key: (7) │ ├── fd: (1)-->(2,3), (7)-->(8,9,11), (2)==(8), (8)==(2), (1)==(7), (7)==(1), (3)==(9), (9)==(3) - │ ├── ordering: +(3|9),+(1|7) [actual: +3] + │ ├── ordering: +(3|9),+(1|7) [actual: +3,+7] │ ├── limit hint: 56.00 │ ├── inner-join (lookup t73968@t73968_x_key) │ │ ├── columns: k:1!null name:2 x:3!null k:7!null x:9!null @@ -2662,3 +2662,186 @@ project │ ├── name:2 = name:8 [outer=(2,8), fd=(2)==(8), (8)==(2)] │ └── k:7::STRING = lower(name:8) [outer=(7,8), immutable] └── 56 + +# Preserving lookup ordering (no sort should be added). +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, y, z, a, b, c +---- +inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+7,+8 [actual: +1,+2,+3,+7,+8] + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Preserving lookup ordering (no sort should be added). The 'u' and 'v' columns +# are equivalent to input ordering columns and can be considered optional. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a AND y = b ORDER BY a, x, b, y, z, c +---- +inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1 2] = [6 7] + ├── key: (3,6-8) + ├── fd: (1)==(6), (6)==(1), (2)==(7), (7)==(2) + ├── ordering: +(1|6),+(2|7),+3,+8 [actual: +1,+2,+3,+8] + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Preserving lookup ordering (no sort should be added). +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, y, z, b +---- +inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+7 [actual: +1,+2,+3,+7] + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Can supply the requested ordering because the descending column from the +# index does not take part in the ordering (no sort should be added). +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc@abc_desc ON x = a ORDER BY x, y, z, a, b +---- +inner-join (lookup abc@abc_desc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+7 [actual: +1,+2,+3,+7] + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Cannot supply requested ordering because input and lookup ordering columns +# are interleaved. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, b, y, z, a, c +---- +sort + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+7,+2,+3,+8 [actual: +1,+7,+2,+3,+8] + └── inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ └── key: (1-3) + └── filters (true) + +# Cannot supply requested ordering because input ordering columns do not form +# a key. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, y, a, b, c +---- +sort (segmented) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+7,+8 [actual: +1,+2,+7,+8] + └── inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +1,+2 + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2 + └── filters (true) + +# Cannot supply requested ordering because lookup ordering columns are not in index +# order. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, y, z, a, c, b +---- +sort (segmented) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+8,+7 [actual: +1,+2,+3,+8,+7] + └── inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +1,+2,+3 + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Cannot supply the requested ordering because the direction of the 'c' column +# is not the same as in the index. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc ON x = a ORDER BY x, y, z, b, c DESC +---- +sort (segmented) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+7,-8 [actual: +1,+2,+3,+7,-8] + └── inner-join (lookup abc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +1,+2,+3 + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) + +# Cannot supply the requested ordering because the descending column from the +# index shows up in the ordering. +opt +SELECT * FROM xyz INNER LOOKUP JOIN abc@abc_desc ON x = a ORDER BY x, y, z, a, b, c DESC +---- +sort (segmented) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +(1|6),+2,+3,+7,-8 [actual: +1,+2,+3,+7,-8] + └── inner-join (lookup abc@abc_desc) + ├── columns: x:1!null y:2!null z:3!null a:6!null b:7!null c:8!null + ├── flags: force lookup join (into right side) + ├── key columns: [1] = [6] + ├── key: (2,3,6-8) + ├── fd: (1)==(6), (6)==(1) + ├── ordering: +1,+2,+3 + ├── scan xyz + │ ├── columns: x:1!null y:2!null z:3!null + │ ├── key: (1-3) + │ └── ordering: +1,+2,+3 + └── filters (true) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 4cd3712b530e..b00f79264c10 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -130,13 +130,14 @@ type joinReader struct { budgetAcc mon.BoundAccount // maintainOrdering indicates whether the ordering of the input stream // needs to be maintained AND that we rely on the streamer for that. - // - // Currently this is only the case when joinReader.maintainOrdering is - // true and we are performing an index join. Due to implementation - // details, we don't rely on the streamer for maintaining the ordering - // for lookup joins at the moment (since we still buffer all looked up - // rows and restore the ordering explicitly via the - // joinReaderOrderingStrategy). + // We currently only rely on the streamer in two cases: + // 1. We are performing an index join and joinReader.maintainOrdering is + // true. + // 2. We are performing a lookup join and maintainLookupOrdering is true. + // Except for case (2), we don't rely on the streamer for maintaining + // the ordering for lookup joins due to implementation details (since we + // still buffer all looked up rows and restore the ordering explicitly via + // the joinReaderOrderingStrategy). maintainOrdering bool diskMonitor *mon.BytesMonitor txnKVStreamerMemAcc mon.BoundAccount @@ -312,6 +313,14 @@ func newJoinReader( if flowCtx.EvalCtx.SessionData().ParallelizeMultiKeyLookupJoinsEnabled { shouldLimitBatches = false } + if spec.MaintainLookupOrdering { + // MaintainLookupOrdering indicates the output of the lookup joiner should + // be sorted by , . It doesn't make sense for + // MaintainLookupOrdering to be true when MaintainOrdering is not. + // Additionally, we need to disable parallelism for the traditional fetcher + // in order to ensure the lookups are ordered, so set shouldLimitBatches. + spec.MaintainOrdering, shouldLimitBatches = true, true + } useStreamer, txn, err := flowCtx.UseStreamer() if err != nil { return nil, err @@ -477,7 +486,13 @@ func newJoinReader( jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - jr.streamerInfo.maintainOrdering = jr.maintainOrdering && readerType == indexJoinReaderType + // The index joiner can rely on the streamer to maintain the input ordering, + // but the lookup joiner currently handles this logic itself, so the + // streamer can operate in OutOfOrder mode. The exception is when the + // results of each lookup need to be returned in index order - in this case, + // InOrder mode must be used for the streamer. + jr.streamerInfo.maintainOrdering = (jr.maintainOrdering && readerType == indexJoinReaderType) || + spec.MaintainLookupOrdering var diskBuffer kvstreamer.ResultDiskBuffer if jr.streamerInfo.maintainOrdering {