From fe14b6298df15b19ba621d5498c36c43feac3098 Mon Sep 17 00:00:00 2001 From: DrewKimball Date: Mon, 8 Aug 2022 03:08:10 -0700 Subject: [PATCH] rowexec: allow ordered joinReader to stream matches to the first row Currently the `joinReaderOrderingStrategy` implementation buffers all looked up rows before matching them with input rows and emitting them. This is necessary because the looked up rows may not be received in input order (which must be maintained). However, rows that match the first input row can be emitted immediately. In the case when there are many rows that match the first input row, this can decrease overhead of the buffer. Additionally, this change can allow a limit to be satisfied earlier, which can significantly decrease latency. This is especially advantageous in the case when there is only one input row, since all lookups can then be rendered and returned in streaming fashion. Release note (performance improvement): The execution engine can now short-circuit execution of lookup joins in more cases, which can decrease latency for queries with limits. --- .../execbuilder/testdata/lookup_join_limit | 102 ++++++++++++++++++ pkg/sql/rowexec/joinreader_strategies.go | 69 ++++++++++-- 2 files changed, 162 insertions(+), 9 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit index 00f3203c2f5f..bdb4577c1498 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit @@ -8,6 +8,8 @@ CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y)); CREATE TABLE b (x INT PRIMARY KEY); INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2); INSERT INTO b VALUES (1), (2), (3), (4); +CREATE TABLE xy (x INT, y INT, PRIMARY KEY(x, y)); +INSERT INTO xy VALUES (1, 1), (1, 2), (1, 3), (2, 1); # Query with an index join and a limit hint. query T @@ -359,3 +361,103 @@ regions: estimated row count: 1 (100% of the table; stats collected ago) table: a@a_y_idx spans: FULL SCAN (SOFT LIMIT) + +# Query with a lookup join and a limit. The lookup join has to preserve the +# input ordering. +query T +EXPLAIN (OPT, VERBOSE) SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2 +---- +limit + ├── columns: x:1 y:2 x:6 y:7 + ├── internal-ordering: +2,+(1|6) + ├── cardinality: [0 - 2] + ├── stats: [rows=2] + ├── cost: 55.99 + ├── key: (6,7) + ├── fd: (1)-->(2), (1)==(6), (6)==(1) + ├── ordering: +2,+(1|6) [actual: +2,+1] + ├── distribution: test + ├── prune: (7) + ├── interesting orderings: (+2,+1) + ├── inner-join (lookup xy) + │ ├── columns: a.x:1 a.y:2 xy.x:6 xy.y:7 + │ ├── flags: force lookup join (into right side) + │ ├── key columns: [1] = [6] + │ ├── stats: [rows=10, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4] + │ ├── cost: 55.96 + │ ├── key: (6,7) + │ ├── fd: (1)-->(2), (1)==(6), (6)==(1) + │ ├── ordering: +2,+(1|6) [actual: +2,+1] + │ ├── limit hint: 2.00 + │ ├── distribution: test + │ ├── prune: (2,7) + │ ├── interesting orderings: (+1) (+2,+1) (+6,+7) + │ ├── scan a@a_y_idx + │ │ ├── columns: a.x:1 a.y:2 + │ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1] + │ │ ├── cost: 15.04 + │ │ ├── key: (1) + │ │ ├── fd: (1)-->(2) + │ │ ├── ordering: +2,+1 + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1,2) + │ │ ├── interesting orderings: (+1) (+2,+1) + │ │ └── unfiltered-cols: (1-5) + │ └── filters (true) + └── 2 + +# Perform a lookup join that preserves its input ordering. Make sure that only +# two rows are read from kv. +query T +EXPLAIN ANALYZE SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 5 (40 B, 5 gRPC calls) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 2 +│ KV time: 0µs +│ KV contention time: 0µs +│ KV rows read: 2 +│ KV bytes read: 16 B +│ KV gRPC calls: 2 +│ estimated max memory allocated: 0 B +│ estimated max sql temp disk usage: 0 B +│ count: 2 +│ +└── • lookup join + │ nodes: + │ regions: + │ actual row count: 2 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 2 + │ KV bytes read: 16 B + │ KV gRPC calls: 2 + │ estimated max memory allocated: 0 B + │ estimated max sql temp disk usage: 0 B + │ table: xy@xy_pkey + │ equality: (x) = (x) + │ + └── • scan + nodes: + regions: + actual row count: 3 + KV time: 0µs + KV contention time: 0µs + KV rows read: 3 + KV bytes read: 24 B + KV gRPC calls: 3 + estimated max memory allocated: 0 B + estimated row count: 1 (100% of the table; stats collected ago) + table: a@a_y_idx + spans: FULL SCAN (SOFT LIMIT) diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 36fde25c1455..4a39cf3cb965 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -504,6 +504,16 @@ type joinReaderOrderingStrategy struct { // outputRowIdx contains the index into the inputRowIdx'th row of // inputRowIdxToLookedUpRowIndices that we're about to emit. outputRowIdx int + // unbufferedRow, if non-nil, contains a looked-up row that matches the + // first input row of the batch. Since joinReaderOrderingStrategy returns + // results in input order, it is safe to return looked-up rows that match + // the first input row immediately. + // TODO(drewk): If we had a way of knowing when no more lookups will be + // performed for a given span ID, it would be possible to start immediately + // returning results for the second row once the first was finished, and so + // on. This could significantly decrease the overhead of buffering looked + // up rows. + unbufferedRow rowenc.EncDatumRow } groupingState *inputBatchGroupingState @@ -619,8 +629,11 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( ctx context.Context, row rowenc.EncDatumRow, spanID int, ) (joinReaderState, error) { matchingInputRowIndices := s.getMatchingRowIndices(spanID) + + // Avoid adding to the buffer if only the first input row was matched, since + // in this case we can just output the row immediately. var containerIdx int - if !s.isPartialJoin { + if !s.isPartialJoin && (len(matchingInputRowIndices) != 1 || matchingInputRowIndices[0] != 0) { // Replace missing values with nulls to appease the row container. for i := range row { if row[i].IsUnset() { @@ -637,6 +650,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( // Update our map from input rows to looked up rows. for _, inputRowIdx := range matchingInputRowIndices { if !s.isPartialJoin { + if inputRowIdx == 0 { + // Don't add to inputRowIdxToLookedUpRowIndices in order to avoid + // emitting more than once. + s.emitCursor.unbufferedRow = row + continue + } s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = append( s.inputRowIdxToLookedUpRowIndices[inputRowIdx], containerIdx) continue @@ -656,7 +675,14 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( // We failed our on-condition. continue } - s.groupingState.setMatched(inputRowIdx) + wasMatched := s.groupingState.setMatched(inputRowIdx) + if !wasMatched && inputRowIdx == 0 { + // This looked up row matches the first row, and we haven't seen a match + // for the first row yet. Don't add to inputRowIdxToLookedUpRowIndices + // in order to avoid emitting more than once. + s.emitCursor.unbufferedRow = row + continue + } s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = partialJoinSentinel } } @@ -673,6 +699,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow( return jrStateUnknown, err } + if s.emitCursor.unbufferedRow != nil { + // The looked up row matched the first input row. Render and emit them + // immediately, then return to performing lookups. + return jrEmittingRows, nil + } + return jrPerformingLookup, nil } @@ -699,7 +731,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( inputRow := s.inputRows[s.emitCursor.inputRowIdx] lookedUpRows := s.inputRowIdxToLookedUpRowIndices[s.emitCursor.inputRowIdx] - if s.emitCursor.outputRowIdx >= len(lookedUpRows) { + if s.emitCursor.unbufferedRow == nil && s.emitCursor.outputRowIdx >= len(lookedUpRows) { // We have no more rows for the current input row. Emit an outer or anti // row if we didn't see a match, and bump to the next input row. inputRowIdx := s.emitCursor.inputRowIdx @@ -725,20 +757,39 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( return nil, jrEmittingRows, nil } - lookedUpRowIdx := lookedUpRows[s.emitCursor.outputRowIdx] - s.emitCursor.outputRowIdx++ + var nextState joinReaderState + if s.emitCursor.unbufferedRow != nil { + // Make sure we return to looking up rows after outputting one that matches + // the first input row. + nextState = jrPerformingLookup + defer func() { s.emitCursor.unbufferedRow = nil }() + } else { + // All lookups have finished, and we are currently iterating through the + // input rows and emitting them. + nextState = jrEmittingRows + defer func() { s.emitCursor.outputRowIdx++ }() + } + switch s.joinType { case descpb.LeftSemiJoin: // A semi-join match means we emit our input row. This is the case where // we used the partialJoinSentinel. - return inputRow, jrEmittingRows, nil + return inputRow, nextState, nil case descpb.LeftAntiJoin: // An anti-join match means we emit nothing. This is the case where // we used the partialJoinSentinel. - return nil, jrEmittingRows, nil + return nil, nextState, nil } - lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */) + var err error + var lookedUpRow rowenc.EncDatumRow + if s.emitCursor.unbufferedRow != nil { + lookedUpRow = s.emitCursor.unbufferedRow + } else { + lookedUpRow, err = s.lookedUpRows.GetRow( + s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + ) + } if err != nil { return nil, jrStateUnknown, err } @@ -758,7 +809,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( } } } - return outputRow, jrEmittingRows, nil + return outputRow, nextState, nil } func (s *joinReaderOrderingStrategy) spilled() bool {