diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit index 67c60309a01b..ffcdc269310e 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit @@ -8,6 +8,8 @@ CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y)); CREATE TABLE b (x INT PRIMARY KEY); INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2); INSERT INTO b VALUES (1), (2), (3), (4); +CREATE TABLE xy (x INT, y INT, PRIMARY KEY(x, y)); +INSERT INTO xy VALUES (1, 1), (1, 2), (1, 3), (2, 1); # Query with an index join and a limit hint. query T @@ -344,3 +346,103 @@ regions: estimated row count: 1 (100% of the table; stats collected ago) table: a@a_y_idx spans: FULL SCAN (SOFT LIMIT) + +# Query with a lookup join and a limit. The lookup join has to preserve the +# input ordering. +query T +EXPLAIN (OPT, VERBOSE) SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2 +---- +limit + ├── columns: x:1 y:2 x:6 y:7 + ├── internal-ordering: +2,+(1|6) + ├── cardinality: [0 - 2] + ├── stats: [rows=2] + ├── cost: 55.99 + ├── key: (6,7) + ├── fd: (1)-->(2), (1)==(6), (6)==(1) + ├── ordering: +2,+(1|6) [actual: +2,+1] + ├── distribution: test + ├── prune: (7) + ├── interesting orderings: (+2,+1) + ├── inner-join (lookup xy) + │ ├── columns: a.x:1 a.y:2 xy.x:6 xy.y:7 + │ ├── flags: force lookup join (into right side) + │ ├── key columns: [1] = [6] + │ ├── stats: [rows=10, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4] + │ ├── cost: 55.96 + │ ├── key: (6,7) + │ ├── fd: (1)-->(2), (1)==(6), (6)==(1) + │ ├── ordering: +2,+(1|6) [actual: +2,+1] + │ ├── limit hint: 2.00 + │ ├── distribution: test + │ ├── prune: (2,7) + │ ├── interesting orderings: (+1) (+2,+1) (+6,+7) + │ ├── scan a@a_y_idx + │ │ ├── columns: a.x:1 a.y:2 + │ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1] + │ │ ├── cost: 15.04 + │ │ ├── key: (1) + │ │ ├── fd: (1)-->(2) + │ │ ├── ordering: +2,+1 + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1,2) + │ │ ├── interesting orderings: (+1) (+2,+1) + │ │ └── unfiltered-cols: (1-5) + │ └── filters (true) + └── 2 + +# Perform a lookup join that preserves its input ordering. Make sure that only +# two rows are read from kv. +query T +EXPLAIN ANALYZE SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 5 (40 B, 5 gRPC calls) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 2 +│ KV time: 0µs +│ KV contention time: 0µs +│ KV rows read: 2 +│ KV bytes read: 16 B +│ KV gRPC calls: 2 +│ estimated max memory allocated: 0 B +│ estimated max sql temp disk usage: 0 B +│ count: 2 +│ +└── • lookup join + │ nodes: + │ regions: + │ actual row count: 2 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 2 + │ KV bytes read: 16 B + │ KV gRPC calls: 2 + │ estimated max memory allocated: 0 B + │ estimated max sql temp disk usage: 0 B + │ table: xy@xy_pkey + │ equality: (x) = (x) + │ + └── • scan + nodes: + regions: + actual row count: 3 + KV time: 0µs + KV contention time: 0µs + KV rows read: 3 + KV bytes read: 24 B + KV gRPC calls: 3 + estimated max memory allocated: 0 B + estimated row count: 1 (100% of the table; stats collected ago) + table: a@a_y_idx + spans: FULL SCAN (SOFT LIMIT) diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 36fde25c1455..bb55eddc9762 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -504,6 +504,16 @@ type joinReaderOrderingStrategy struct { // outputRowIdx contains the index into the inputRowIdx'th row of // inputRowIdxToLookedUpRowIndices that we're about to emit. outputRowIdx int + // notBufferedRow, if non-nil, contains a looked-up row that matches the + // first input row of the batch. Since joinReaderOrderingStrategy returns + // results in input order, it is safe to return looked-up rows that match + // the first input row immediately. + // TODO(drewk): If we had a way of knowing when no more lookups will be + // performed for a given span ID, it would be possible to start immediately + // returning results for the second row once the first was finished, and so + // on. This could significantly decrease the overhead of buffering looked + // up rows. + notBufferedRow rowenc.EncDatumRow } groupingState *inputBatchGroupingState @@ -527,6 +537,10 @@ type joinReaderOrderingStrategy struct { // inputRowIdxToLookedUpRowIndices is a 1:1 mapping with the multimap // with the same name, where each int64 indicates the memory usage of // the corresponding []int that is currently registered with memAcc. + // + // Note that inputRowIdxToLookedUpRowIndices does not contain entries for + // the first input row, because matches to the first row are emitted + // immediately. inputRowIdxToLookedUpRowIndices []int64 } @@ -619,8 +633,11 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( ctx context.Context, row rowenc.EncDatumRow, spanID int, ) (joinReaderState, error) { matchingInputRowIndices := s.getMatchingRowIndices(spanID) + + // Avoid adding to the buffer if only the first input row was matched, since + // in this case we can just output the row immediately. var containerIdx int - if !s.isPartialJoin { + if !s.isPartialJoin && (len(matchingInputRowIndices) != 1 || matchingInputRowIndices[0] != 0) { // Replace missing values with nulls to appease the row container. for i := range row { if row[i].IsUnset() { @@ -637,6 +654,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( // Update our map from input rows to looked up rows. for _, inputRowIdx := range matchingInputRowIndices { if !s.isPartialJoin { + if inputRowIdx == 0 { + // Don't add to inputRowIdxToLookedUpRowIndices in order to avoid + // emitting more than once. + s.emitCursor.notBufferedRow = row + continue + } s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = append( s.inputRowIdxToLookedUpRowIndices[inputRowIdx], containerIdx) continue @@ -656,7 +679,14 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( // We failed our on-condition. continue } - s.groupingState.setMatched(inputRowIdx) + wasMatched := s.groupingState.setMatched(inputRowIdx) + if !wasMatched && inputRowIdx == 0 { + // This looked up row matches the first row, and we haven't seen a match + // for the first row yet. Don't add to inputRowIdxToLookedUpRowIndices + // in order to avoid emitting more than once. + s.emitCursor.notBufferedRow = row + continue + } s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = partialJoinSentinel } } @@ -673,6 +703,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( return jrStateUnknown, err } + if s.emitCursor.notBufferedRow != nil { + // The looked up row matched the first input row. Render and emit them + // immediately, then return to performing lookups. + return jrEmittingRows, nil + } + return jrPerformingLookup, nil } @@ -699,7 +735,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( inputRow := s.inputRows[s.emitCursor.inputRowIdx] lookedUpRows := s.inputRowIdxToLookedUpRowIndices[s.emitCursor.inputRowIdx] - if s.emitCursor.outputRowIdx >= len(lookedUpRows) { + if s.emitCursor.notBufferedRow == nil && s.emitCursor.outputRowIdx >= len(lookedUpRows) { // We have no more rows for the current input row. Emit an outer or anti // row if we didn't see a match, and bump to the next input row. inputRowIdx := s.emitCursor.inputRowIdx @@ -725,20 +761,39 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( return nil, jrEmittingRows, nil } - lookedUpRowIdx := lookedUpRows[s.emitCursor.outputRowIdx] - s.emitCursor.outputRowIdx++ + var nextState joinReaderState + if s.emitCursor.notBufferedRow != nil { + // Make sure we return to looking up rows after outputting one that matches + // the first input row. + nextState = jrPerformingLookup + defer func() { s.emitCursor.notBufferedRow = nil }() + } else { + // All lookups have finished, and we are currently iterating through the + // input rows and emitting them. + nextState = jrEmittingRows + defer func() { s.emitCursor.outputRowIdx++ }() + } + switch s.joinType { case descpb.LeftSemiJoin: // A semi-join match means we emit our input row. This is the case where // we used the partialJoinSentinel. - return inputRow, jrEmittingRows, nil + return inputRow, nextState, nil case descpb.LeftAntiJoin: // An anti-join match means we emit nothing. This is the case where // we used the partialJoinSentinel. - return nil, jrEmittingRows, nil + return nil, nextState, nil } - lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */) + var err error + var lookedUpRow rowenc.EncDatumRow + if s.emitCursor.notBufferedRow != nil { + lookedUpRow = s.emitCursor.notBufferedRow + } else { + lookedUpRow, err = s.lookedUpRows.GetRow( + s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + ) + } if err != nil { return nil, jrStateUnknown, err } @@ -758,7 +813,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( } } } - return outputRow, jrEmittingRows, nil + return outputRow, nextState, nil } func (s *joinReaderOrderingStrategy) spilled() bool { diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index 325fa7ff6593..28ca3214cdc7 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -1288,9 +1288,11 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil { // DiskBackedIndexedRowContainer. flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = mon.DefaultPoolAllocationSize - // Input row is just a single 0. + // The two input rows are just a single 0 each. We use two input rows because + // matches to the first input row are never buffered. inputRows := rowenc.EncDatumRows{ rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}}, + rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}}, } var fetchSpec descpb.IndexFetchSpec if err := rowenc.InitIndexFetchSpec( @@ -1341,7 +1343,7 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil { require.Equal(t, expected, actual) count++ } - require.Equal(t, numRows, count) + require.Equal(t, numRows*len(inputRows), count) require.True(t, jr.(*joinReader).Spilled()) }