Skip to content

Commit

Permalink
rowexec: allow ordered joinReader to stream matches to the first row
Browse files Browse the repository at this point in the history
Currently the `joinReaderOrderingStrategy` implementation buffers all looked up
rows before matching them with input rows and emitting them. This is necessary
because the looked up rows may not be received in input order (which must be
maintained). However, rows that match the first input row can be emitted
immediately. In the case when there are many rows that match the first input
row, this can decrease overhead of the buffer. Additionally, this change can
allow a limit to be satisfied earlier, which can significantly decrease
latency. This is especially advantageous in the case when there is only one
input row, since all lookups can then be rendered and returned in streaming
fashion.

Release note (performance improvement): The execution engine can now
short-circuit execution of lookup joins in more cases, which can decrease
latency for queries with limits.
  • Loading branch information
DrewKimball committed Aug 8, 2022
1 parent 49494e5 commit d413fe7
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 11 deletions.
92 changes: 92 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y));
CREATE TABLE b (x INT PRIMARY KEY);
INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2);
INSERT INTO b VALUES (1), (2), (3), (4);
CREATE TABLE xy (x INT, y INT, PRIMARY KEY(x, y));
INSERT INTO xy VALUES (1, 1), (1, 2), (1, 3), (2, 1);

# Query with an index join and a limit hint.
query T
Expand Down Expand Up @@ -344,3 +346,93 @@ regions: <hidden>
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: a@a_y_idx
spans: FULL SCAN (SOFT LIMIT)

# Query with a lookup join and a limit. The lookup join has to preserve the
# input ordering.
query T
EXPLAIN (OPT, VERBOSE) SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2
----
limit
├── columns: x:1 y:2 x:6 y:7
├── internal-ordering: +2,+(1|6)
├── cardinality: [0 - 2]
├── stats: [rows=2]
├── cost: 55.99
├── key: (6,7)
├── fd: (1)-->(2), (1)==(6), (6)==(1)
├── ordering: +2,+(1|6) [actual: +2,+1]
├── distribution: test
├── prune: (7)
├── interesting orderings: (+2,+1)
├── inner-join (lookup xy)
│ ├── columns: a.x:1 a.y:2 xy.x:6 xy.y:7
│ ├── flags: force lookup join (into right side)
│ ├── key columns: [1] = [6]
│ ├── stats: [rows=10, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4]
│ ├── cost: 55.96
│ ├── key: (6,7)
│ ├── fd: (1)-->(2), (1)==(6), (6)==(1)
│ ├── ordering: +2,+(1|6) [actual: +2,+1]
│ ├── limit hint: 2.00
│ ├── distribution: test
│ ├── prune: (2,7)
│ ├── interesting orderings: (+1) (+2,+1) (+6,+7)
│ ├── scan a@a_y_idx
│ │ ├── columns: a.x:1 a.y:2
│ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1]
│ │ ├── cost: 15.04
│ │ ├── key: (1)
│ │ ├── fd: (1)-->(2)
│ │ ├── ordering: +2,+1
│ │ ├── limit hint: 1.00
│ │ ├── distribution: test
│ │ ├── prune: (1,2)
│ │ ├── interesting orderings: (+1) (+2,+1)
│ │ └── unfiltered-cols: (1-5)
│ └── filters (true)
└── 2

# Perform a lookup join that preserves its input ordering. Make sure that only
# two rows are read from kv.
query T
EXPLAIN ANALYZE SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
rows read from KV: 5 (40 B, 5 gRPC calls)
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
·
• limit
│ count: 2
└── • lookup join
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 2
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows read: 2
│ KV bytes read: 16 B
│ KV gRPC calls: 2
│ estimated max memory allocated: 0 B
│ estimated max sql temp disk usage: 0 B
│ table: xy@xy_pkey
│ equality: (x) = (x)
└── • scan
nodes: <hidden>
regions: <hidden>
actual row count: 3
KV time: 0µs
KV contention time: 0µs
KV rows read: 3
KV bytes read: 24 B
KV gRPC calls: 3
estimated max memory allocated: 0 B
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: a@a_y_idx
spans: FULL SCAN (SOFT LIMIT)
73 changes: 64 additions & 9 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,16 @@ type joinReaderOrderingStrategy struct {
// outputRowIdx contains the index into the inputRowIdx'th row of
// inputRowIdxToLookedUpRowIndices that we're about to emit.
outputRowIdx int
// notBufferedRow, if non-nil, contains a looked-up row that matches the
// first input row of the batch. Since joinReaderOrderingStrategy returns
// results in input order, it is safe to return looked-up rows that match
// the first input row immediately.
// TODO(drewk): If we had a way of knowing when no more lookups will be
// performed for a given span ID, it would be possible to start immediately
// returning results for the second row once the first was finished, and so
// on. This could significantly decrease the overhead of buffering looked
// up rows.
notBufferedRow rowenc.EncDatumRow
}

groupingState *inputBatchGroupingState
Expand All @@ -527,6 +537,10 @@ type joinReaderOrderingStrategy struct {
// inputRowIdxToLookedUpRowIndices is a 1:1 mapping with the multimap
// with the same name, where each int64 indicates the memory usage of
// the corresponding []int that is currently registered with memAcc.
//
// Note that inputRowIdxToLookedUpRowIndices does not contain entries for
// the first input row, because matches to the first row are emitted
// immediately.
inputRowIdxToLookedUpRowIndices []int64
}

Expand Down Expand Up @@ -619,8 +633,11 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
ctx context.Context, row rowenc.EncDatumRow, spanID int,
) (joinReaderState, error) {
matchingInputRowIndices := s.getMatchingRowIndices(spanID)

// Avoid adding to the buffer if only the first input row was matched, since
// in this case we can just output the row immediately.
var containerIdx int
if !s.isPartialJoin {
if !s.isPartialJoin && (len(matchingInputRowIndices) != 1 || matchingInputRowIndices[0] != 0) {
// Replace missing values with nulls to appease the row container.
for i := range row {
if row[i].IsUnset() {
Expand All @@ -637,6 +654,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// Update our map from input rows to looked up rows.
for _, inputRowIdx := range matchingInputRowIndices {
if !s.isPartialJoin {
if inputRowIdx == 0 {
// Don't add to inputRowIdxToLookedUpRowIndices in order to avoid
// emitting more than once.
s.emitCursor.notBufferedRow = row
continue
}
s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = append(
s.inputRowIdxToLookedUpRowIndices[inputRowIdx], containerIdx)
continue
Expand All @@ -656,7 +679,14 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// We failed our on-condition.
continue
}
s.groupingState.setMatched(inputRowIdx)
wasMatched := s.groupingState.setMatched(inputRowIdx)
if !wasMatched && inputRowIdx == 0 {
// This looked up row matches the first row, and we haven't seen a match
// for the first row yet. Don't add to inputRowIdxToLookedUpRowIndices
// in order to avoid emitting more than once.
s.emitCursor.notBufferedRow = row
continue
}
s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = partialJoinSentinel
}
}
Expand All @@ -673,6 +703,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
return jrStateUnknown, err
}

if s.emitCursor.notBufferedRow != nil {
// The looked up row matched the first input row. Render and emit them
// immediately, then return to performing lookups.
return jrEmittingRows, nil
}

return jrPerformingLookup, nil
}

Expand All @@ -699,7 +735,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(

inputRow := s.inputRows[s.emitCursor.inputRowIdx]
lookedUpRows := s.inputRowIdxToLookedUpRowIndices[s.emitCursor.inputRowIdx]
if s.emitCursor.outputRowIdx >= len(lookedUpRows) {
if s.emitCursor.notBufferedRow == nil && s.emitCursor.outputRowIdx >= len(lookedUpRows) {
// We have no more rows for the current input row. Emit an outer or anti
// row if we didn't see a match, and bump to the next input row.
inputRowIdx := s.emitCursor.inputRowIdx
Expand All @@ -725,20 +761,39 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
return nil, jrEmittingRows, nil
}

lookedUpRowIdx := lookedUpRows[s.emitCursor.outputRowIdx]
s.emitCursor.outputRowIdx++
var nextState joinReaderState
if s.emitCursor.notBufferedRow != nil {
// Make sure we return to looking up rows after outputting one that matches
// the first input row.
nextState = jrPerformingLookup
defer func() { s.emitCursor.notBufferedRow = nil }()
} else {
// All lookups have finished, and we are currently iterating through the
// input rows and emitting them.
nextState = jrEmittingRows
defer func() { s.emitCursor.outputRowIdx++ }()
}

switch s.joinType {
case descpb.LeftSemiJoin:
// A semi-join match means we emit our input row. This is the case where
// we used the partialJoinSentinel.
return inputRow, jrEmittingRows, nil
return inputRow, nextState, nil
case descpb.LeftAntiJoin:
// An anti-join match means we emit nothing. This is the case where
// we used the partialJoinSentinel.
return nil, jrEmittingRows, nil
return nil, nextState, nil
}

lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */)
var err error
var lookedUpRow rowenc.EncDatumRow
if s.emitCursor.notBufferedRow != nil {
lookedUpRow = s.emitCursor.notBufferedRow
} else {
lookedUpRow, err = s.lookedUpRows.GetRow(
s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */
)
}
if err != nil {
return nil, jrStateUnknown, err
}
Expand All @@ -758,7 +813,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
}
}
}
return outputRow, jrEmittingRows, nil
return outputRow, nextState, nil
}

func (s *joinReaderOrderingStrategy) spilled() bool {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,9 +1288,11 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil {
// DiskBackedIndexedRowContainer.
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = mon.DefaultPoolAllocationSize

// Input row is just a single 0.
// The two input rows are just a single 0 each. We use two input rows because
// matches to the first input row are never buffered.
inputRows := rowenc.EncDatumRows{
rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}},
rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}},
}
var fetchSpec descpb.IndexFetchSpec
if err := rowenc.InitIndexFetchSpec(
Expand Down Expand Up @@ -1341,7 +1343,7 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil {
require.Equal(t, expected, actual)
count++
}
require.Equal(t, numRows, count)
require.Equal(t, numRows*len(inputRows), count)
require.True(t, jr.(*joinReader).Spilled())
}

Expand Down

0 comments on commit d413fe7

Please sign in to comment.