Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
25815: distsql: add batching for secondary lookup joins r=solongordon a=solongordon

Lookup joins on non-covering secondary indexes were previously making a
separate primary index scan for every secondary index row. Now those
scans are grouped together into batches of up to 100 spans.

Release note: None

Co-authored-by: Solon Gordon <[email protected]>
  • Loading branch information
craig[bot] and solongordon committed May 23, 2018
2 parents 7d99811 + 67ae2ab commit aa02b29
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 71 deletions.
239 changes: 169 additions & 70 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type joinReader struct {
index *sqlbase.IndexDescriptor
colIdxMap map[sqlbase.ColumnID]int

fetcher sqlbase.RowFetcher
alloc sqlbase.DatumAlloc
fetcher sqlbase.RowFetcher
alloc sqlbase.DatumAlloc
rowAlloc sqlbase.EncDatumRowAlloc

input RowSource
inputTypes []sqlbase.ColumnType
Expand All @@ -71,6 +72,9 @@ type joinReader struct {
primaryFetcher *sqlbase.RowFetcher
primaryColumnTypes []sqlbase.ColumnType
primaryKeyPrefix []byte

// Batch size for fetches. Not a constant so we can lower for testing.
batchSize int
}

var _ Processor = &joinReader{}
Expand All @@ -90,6 +94,7 @@ func newJoinReader(
input: input,
inputTypes: input.OutputTypes(),
lookupCols: spec.LookupColumns,
batchSize: joinReaderBatchSize,
}

var err error
Expand Down Expand Up @@ -282,8 +287,8 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
primaryKeyPrefix := sqlbase.MakeIndexKeyPrefix(&jr.desc, jr.index.ID)

var alloc sqlbase.DatumAlloc
rows := make([]sqlbase.EncDatumRow, 0, joinReaderBatchSize)
spans := make(roachpb.Spans, 0, joinReaderBatchSize)
var rows []sqlbase.EncDatumRow
var spans roachpb.Spans

txn := jr.flowCtx.txn
if txn == nil {
Expand All @@ -295,15 +300,15 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
defer log.Infof(ctx, "exiting")
}

spanToRowIndices := make(map[string][]int)
for {
// TODO(radu): figure out how to send smaller batches if the source has
// a soft limit (perhaps send the batch out if we don't get a result
// within a certain amount of time).
rowIdx := 0
rows = rows[:0]
spans = spans[:0]
for len(spans) < joinReaderBatchSize {
spanToRowIndices := make(map[string][]int, joinReaderBatchSize)
for len(spans) < jr.batchSize {
row, meta := jr.input.Next()
if meta != nil {
if meta.Err != nil {
Expand Down Expand Up @@ -348,13 +353,20 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
// TODO(radu): we are consuming all results from a fetch before starting
// the next batch. We could start the next batch early while we are
// outputting rows.
if earlyExit, err := jr.indexLookup(ctx, txn, spans, rows, spanToRowIndices); err != nil {
var earlyExit bool
var err error
if jr.isLookupJoin() {
earlyExit, err = jr.lookupJoinLookup(ctx, txn, spans, rows, spanToRowIndices)
} else {
earlyExit, err = jr.indexJoinLookup(ctx, txn, spans)
}
if err != nil {
return err
} else if earlyExit {
return nil
}

if len(spans) != joinReaderBatchSize {
if len(spans) != jr.batchSize {
// This was the last batch.
jr.pushTrailingMeta(ctx)
jr.out.Close()
Expand All @@ -369,18 +381,55 @@ func (jr *joinReader) isLookupJoin() bool {
return len(jr.lookupCols) > 0
}

// Index lookup iterates through all matches of the given spans and emits the
// corresponding row.
// indexJoinLookup performs an index lookup for the purposes of an index join.
// It fetches the specified spans from the primary index and emits the results.
//
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexJoinLookup(
ctx context.Context, txn *client.Txn, spans roachpb.Spans,
) (bool, error) {
// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
err := jr.fetcher.StartScan(
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return true, err
}
for {
row, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if row == nil {
// Done with this batch.
break
}
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, row, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
}
}
return false, nil
}

// lookupJoinLookup performs an index lookup for the purposes of a lookup join.
// `spans` is the set of spans which should be fetched from the index. `rows` is
// the corresponding input rows. `spanToRowIndices` maps span keys onto the
// corresponding `rows` indices.
//
// For lookup joins, the `rows` and `spanToRowIndices` arguments must be
// provided. `rows` is the input rows which correspond to the specified spans.
// `spanToRowIndices` maps span keys onto the indices of the corresponding rows
// in the `rows` slice. For index joins, these arguments are ignored.
// Note that if jr.primaryFetcher is non-nil, this function will perform two
// lookups: one to fetch rows from a secondary index, and a second to fetch the
// corresponding primary rows. This is for lookup joins on a secondary index
// which does not cover all the needed output columns. (Due to batching it may
// actually perform multiple primary lookups.)
//
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexLookup(
func (jr *joinReader) lookupJoinLookup(
ctx context.Context,
txn *client.Txn,
spans roachpb.Spans,
Expand All @@ -389,13 +438,7 @@ func (jr *joinReader) indexLookup(
) (bool, error) {
// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
err := jr.fetcher.StartScan(
ctx,
txn,
spans,
false, /* no batch limits */
0,
false, /* traceKV */
)
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return true, err
Expand All @@ -411,45 +454,65 @@ func (jr *joinReader) indexLookup(
emitted = make([]bool, len(rows))
}

// lookupRow represents an index key and the corresponding row.
type lookupRow struct {
key string
row sqlbase.EncDatumRow
}
lookupRows := make([]lookupRow, 0, joinReaderBatchSize)

// The index scan may have returned more rows than len(spans), so process the
// results in batches.
for {
indexKey := jr.fetcher.IndexKeyString(len(jr.lookupCols))
indexRow, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if indexRow == nil {
// Done with this batch.
break
// Get the next batch of lookup results.
lookupRows = lookupRows[:0]
for len(lookupRows) < joinReaderBatchSize {
key := jr.fetcher.IndexKeyString(len(jr.lookupCols))
row, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if row == nil {
// Done with this batch.
break
}
lookupRows = append(lookupRows, lookupRow{key, jr.rowAlloc.CopyRow(row)})
}

rowIndices := spanToRowIndices[indexKey]

if !jr.isLookupJoin() {
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
if jr.primaryFetcher != nil {
// The lookup was on a non-covering secondary index, so we need to do a
// second lookup against the primary index and replace our previous
// results with the primary rows.
// TODO(solon): Allocate this up front rather than once per batch.
secondaryIndexRows := make([]sqlbase.EncDatumRow, len(lookupRows))
for i := range lookupRows {
secondaryIndexRows[i] = lookupRows[i].row
}
} else {
if jr.primaryFetcher != nil {
// TODO(solon): Batch these primary index scans.
indexRow, err = jr.primaryLookup(ctx, txn, indexRow)
if err != nil {
return false, err
}
primaryRows, err := jr.primaryLookup(ctx, txn, secondaryIndexRows)
if err != nil {
return false, err
}
for i := range primaryRows {
lookupRows[i].row = primaryRows[i]
}
}

// Iterate over the lookup results, map them to the input rows, and emit the
// rendered rows.
for _, lookupRow := range lookupRows {
if jr.indexFilter.expr != nil {
// Apply index filter.
res, err := jr.indexFilter.evalFilter(indexRow)
res, err := jr.indexFilter.evalFilter(lookupRow.row)
if err != nil {
return false, err
}
if !res {
continue
}
}
for _, rowIdx := range rowIndices {
renderedRow, err := jr.render(rows[rowIdx], indexRow)
for _, rowIdx := range spanToRowIndices[lookupRow.key] {
renderedRow, err := jr.render(rows[rowIdx], lookupRow.row)
if err != nil {
return false, err
}
Expand All @@ -468,6 +531,11 @@ func (jr *joinReader) indexLookup(
}
}
}

if len(lookupRows) < joinReaderBatchSize {
// This was the last batch.
break
}
}

if emitted != nil {
Expand All @@ -486,44 +554,75 @@ func (jr *joinReader) indexLookup(
return false, nil
}

// primaryLookup looks up the corresponding primary index row, given a secondary
// index row.
// primaryLookup looks up the corresponding primary index rows, given a batch of
// secondary index rows. Since we expect a 1-1 correspondence between the input
// and output, it returns a slice of rows where each row corresponds to the
// input with the same slice index.
func (jr *joinReader) primaryLookup(
ctx context.Context, txn *client.Txn, secondaryIndexRow sqlbase.EncDatumRow,
) (sqlbase.EncDatumRow, error) {
index := jr.desc.PrimaryIndex
keyValues := make(sqlbase.EncDatumRow, len(index.ColumnIDs))
for i, columnID := range index.ColumnIDs {
keyValues[i] = secondaryIndexRow[jr.colIdxMap[columnID]]
ctx context.Context, txn *client.Txn, secondaryIndexRows []sqlbase.EncDatumRow,
) ([]sqlbase.EncDatumRow, error) {
batchSize := len(secondaryIndexRows)
if batchSize == 0 {
return nil, nil
}
key, err := sqlbase.MakeKeyFromEncDatums(
jr.primaryColumnTypes, keyValues, &jr.desc, &index, jr.primaryKeyPrefix, &jr.alloc)
if err != nil {
return nil, err
numKeyCols := len(jr.desc.PrimaryIndex.ColumnIDs)
// keyToInputRowIdx maps primary index keys to the input rows.
keyToInputRowIdx := make(map[string]int, batchSize)
outRows := make([]sqlbase.EncDatumRow, batchSize)

// Build spans for the primary index lookup.
spans := make([]roachpb.Span, batchSize)
for rowIdx, row := range secondaryIndexRows {
values := make(sqlbase.EncDatumRow, numKeyCols)
for i, columnID := range jr.desc.PrimaryIndex.ColumnIDs {
values[i] = row[jr.colIdxMap[columnID]]
}
key, err := sqlbase.MakeKeyFromEncDatums(
jr.primaryColumnTypes, values, &jr.desc, &jr.desc.PrimaryIndex, jr.primaryKeyPrefix,
&jr.alloc)
if err != nil {
return nil, err
}
keyToInputRowIdx[key.String()] = rowIdx
spans[rowIdx] = roachpb.Span{Key: key, EndKey: key.PrefixEnd()}
}
spans := []roachpb.Span{{Key: key, EndKey: key.PrefixEnd()}}
err = jr.primaryFetcher.StartScan(

// Perform the primary index scan.
err := jr.primaryFetcher.StartScan(
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return nil, err
}
row, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
// Assert that the fetcher returns exactly one row.
if row == nil {
return nil, errors.New("expected exactly one row but found none")

// Iterate over the fetched rows and map them onto the input rows so we can
// return them in the same order.
for i := 0; i < batchSize; i++ {
key := jr.primaryFetcher.IndexKeyString(numKeyCols)
rowIdx, ok := keyToInputRowIdx[key]
if !ok {
return nil, errors.Errorf("failed to find key %v in keyToInputRowIdx %v", key, keyToInputRowIdx)
}
row, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
if row == nil {
return nil, errors.Errorf("expected %d rows but found %d", batchSize, i)
}
outRows[rowIdx] = jr.rowAlloc.CopyRow(row)
}

// Verify that we consumed all the fetched rows.
nextRow, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
if nextRow != nil {
return nil, errors.New("expected exactly one row but found multiple")
return nil, errors.Errorf("expected %d rows but found more", batchSize)
}
return row, nil

return outRows, nil
}

// Run is part of the processor interface.
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/distsqlrun/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func TestJoinReader(t *testing.T) {
t.Fatal(err)
}

// Set a lower batch size to force multiple batches.
jr.batchSize = 2

jr.Run(context.Background(), nil /* wg */)

if !in.Done {
Expand Down Expand Up @@ -342,6 +345,10 @@ INSERT INTO test.t VALUES
if err != nil {
t.Fatal(err)
}

// Set a lower batch size to force multiple batches.
jr.batchSize = 2

jr.Run(context.Background(), nil /* wg */)

// Check results.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type DistSQLVersion uint32
//
// ATTENTION: When updating these fields, add to version_history.txt explaining
// what changed.
const Version DistSQLVersion = 13
const Version DistSQLVersion = 14

// MinAcceptedVersion is the oldest version that the server is
// compatible with; see above.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/distsqlrun/version_history.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@
- Add NumRows to ValuesSpec (used for zero-column case). The implementation
tolerates this field being unset in existing planning cases (at lest one
column).
- Version: 14 (MinAcceptedVersion: 6)
- Enhancements to lookup joins. They now support joining against secondary
indexes as well as left outer joins. Left join support required two
additional fields on JoinReaderSpec: index_filter_expr and type.

0 comments on commit aa02b29

Please sign in to comment.