Skip to content

Commit

Permalink
distsql: add batching for secondary lookup joins
Browse files Browse the repository at this point in the history
Lookup joins on non-covering secondary indexes were previously making a
separate primary index scan for every secondary index row. Now those
scans are grouped together into batches of up to 100 spans.

Release note: None
  • Loading branch information
solongordon committed May 22, 2018
1 parent 5caa34d commit 1533557
Showing 1 changed file with 159 additions and 65 deletions.
224 changes: 159 additions & 65 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type joinReader struct {
index *sqlbase.IndexDescriptor
colIdxMap map[sqlbase.ColumnID]int

fetcher sqlbase.RowFetcher
alloc sqlbase.DatumAlloc
fetcher sqlbase.RowFetcher
alloc sqlbase.DatumAlloc
rowAlloc sqlbase.EncDatumRowAlloc

input RowSource
inputTypes []sqlbase.ColumnType
Expand Down Expand Up @@ -297,14 +298,14 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
defer log.Infof(ctx, "exiting")
}

spanToRowIndices := make(map[string][]int)
for {
// TODO(radu): figure out how to send smaller batches if the source has
// a soft limit (perhaps send the batch out if we don't get a result
// within a certain amount of time).
rowIdx := 0
rows = rows[:0]
spans = spans[:0]
spanToRowIndices := make(map[string][]int, joinReaderBatchSize)
for len(spans) < joinReaderBatchSize {
row, meta := jr.input.Next()
if meta != nil {
Expand Down Expand Up @@ -350,7 +351,14 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
// TODO(radu): we are consuming all results from a fetch before starting
// the next batch. We could start the next batch early while we are
// outputting rows.
if earlyExit, err := jr.indexLookup(ctx, txn, spans, rows, spanToRowIndices); err != nil {
var earlyExit bool
var err error
if jr.isLookupJoin() {
earlyExit, err = jr.lookupJoinLookup(ctx, txn, spans, rows, spanToRowIndices)
} else {
earlyExit, err = jr.indexJoinLookup(ctx, txn, spans)
}
if err != nil {
return err
} else if earlyExit {
return nil
Expand All @@ -371,18 +379,55 @@ func (jr *joinReader) isLookupJoin() bool {
return len(jr.lookupCols) > 0
}

// Index lookup iterates through all matches of the given spans and emits the
// corresponding row.
// indexJoinLookup performs an index lookup for the purposes of an index join.
// It fetches the specified spans from the primary index and emits the results.
//
// For lookup joins, the `rows` and `spanToRowIndices` arguments must be
// provided. `rows` is the input rows which correspond to the specified spans.
// `spanToRowIndices` maps span keys onto the indices of the corresponding rows
// in the `rows` slice. For index joins, these arguments are ignored.
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexJoinLookup(
ctx context.Context, txn *client.Txn, spans roachpb.Spans,
) (bool, error) {
// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
err := jr.fetcher.StartScan(
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return true, err
}
for {
row, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if row == nil {
// Done with this batch.
break
}
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, row, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
}
}
return false, nil
}

// lookupJoinLookup performs an index lookup for the purposes of a lookup join.
// `spans` is the set of spans which should be fetched from the index. `rows` is
// the corresponding input rows. `spanToRowIndices` maps span keys onto the
// corresponding `rows` indices.
//
// Note that if jr.primaryFetcher is non-nil, this function will perform two
// lookups: one to fetch rows from a secondary index, and a second to fetch the
// corresponding primary rows. This is for lookup joins on a secondary index
// which does not cover all the needed output columns. (Due to batching it may
// actually perform multiple primary lookups.)
//
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexLookup(
func (jr *joinReader) lookupJoinLookup(
ctx context.Context,
txn *client.Txn,
spans roachpb.Spans,
Expand All @@ -391,13 +436,7 @@ func (jr *joinReader) indexLookup(
) (bool, error) {
// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
err := jr.fetcher.StartScan(
ctx,
txn,
spans,
false, /* no batch limits */
0,
false, /* traceKV */
)
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return true, err
Expand All @@ -413,35 +452,54 @@ func (jr *joinReader) indexLookup(
emitted = make([]bool, len(rows))
}

// lookupRow represents an index key and the corresponding row.
type lookupRow = struct {
key string
row sqlbase.EncDatumRow
}
lookupRows := make([]lookupRow, 0, joinReaderBatchSize)

// The index scan may have returned more rows than len(spans), so process the
// results in batches.
for {
indexKey := jr.fetcher.IndexKeyString(len(jr.lookupCols))
indexRow, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if indexRow == nil {
// Done with this batch.
break
// Get the next batch of lookup results.
lookupRows = lookupRows[:0]
for len(lookupRows) < joinReaderBatchSize {
key := jr.fetcher.IndexKeyString(len(jr.lookupCols))
row, _, _, err := jr.fetcher.NextRow(ctx)
if err != nil {
err = scrub.UnwrapScrubError(err)
return true, err
}
if row == nil {
// Done with this batch.
break
}
lookupRows = append(lookupRows, lookupRow{key, jr.rowAlloc.CopyRow(row)})
}

rowIndices := spanToRowIndices[indexKey]

if !jr.isLookupJoin() {
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
if jr.primaryFetcher != nil {
// The lookup was on a non-covering secondary index, so we need to do a
// second lookup against the primary index and replace our previous
// results with the primary rows.
secondaryIndexRows := make([]sqlbase.EncDatumRow, len(lookupRows))
for i := range lookupRows {
secondaryIndexRows[i] = lookupRows[i].row
}
} else {
if jr.primaryFetcher != nil {
// TODO(solon): Batch these primary index scans.
indexRow, err = jr.primaryLookup(ctx, txn, indexRow)
if err != nil {
return false, err
}
rows, err := jr.primaryLookup(ctx, txn, secondaryIndexRows)
if err != nil {
return false, err
}
for i := range rows {
lookupRows[i].row = rows[i]
}
for _, rowIdx := range rowIndices {
renderedRow, err := jr.render(rows[rowIdx], indexRow)
}

// Iterate over the lookup results, map them to the input rows, and emit the
// rendered rows.
for _, lookupRow := range lookupRows {
for _, rowIdx := range spanToRowIndices[lookupRow.key] {
renderedRow, err := jr.render(rows[rowIdx], lookupRow.row)
if err != nil {
return false, err
}
Expand All @@ -460,6 +518,11 @@ func (jr *joinReader) indexLookup(
}
}
}

if len(lookupRows) < joinReaderBatchSize {
// This was the last batch.
break
}
}

if emitted != nil {
Expand All @@ -478,44 +541,75 @@ func (jr *joinReader) indexLookup(
return false, nil
}

// primaryLookup looks up the corresponding primary index row, given a secondary
// index row.
// primaryLookup looks up the corresponding primary index rows, given a batch of
// secondary index rows. Since we expect a 1-1 correspondence between the input
// and output, it returns a slice of rows where each row corresponds to the
// input with the same slice index.
func (jr *joinReader) primaryLookup(
ctx context.Context, txn *client.Txn, secondaryIndexRow sqlbase.EncDatumRow,
) (sqlbase.EncDatumRow, error) {
index := jr.desc.PrimaryIndex
keyValues := make(sqlbase.EncDatumRow, len(index.ColumnIDs))
for i, columnID := range index.ColumnIDs {
keyValues[i] = secondaryIndexRow[jr.colIdxMap[columnID]]
ctx context.Context, txn *client.Txn, secondaryIndexRows []sqlbase.EncDatumRow,
) ([]sqlbase.EncDatumRow, error) {
batchSize := len(secondaryIndexRows)
if batchSize == 0 {
return secondaryIndexRows, nil
}
key, err := sqlbase.MakeKeyFromEncDatums(
jr.primaryColumnTypes, keyValues, &jr.desc, &index, jr.primaryKeyPrefix, &jr.alloc)
if err != nil {
return nil, err
numKeyCols := len(jr.desc.PrimaryIndex.ColumnIDs)
// keyToInputRowIdx maps primary index keys to the input rows.
keyToInputRowIdx := make(map[string]int, batchSize)
outRows := make([]sqlbase.EncDatumRow, batchSize)

// Build spans for the primary index lookup.
spans := make([]roachpb.Span, batchSize)
for rowIdx, row := range secondaryIndexRows {
values := make(sqlbase.EncDatumRow, numKeyCols)
for i, columnID := range jr.desc.PrimaryIndex.ColumnIDs {
values[i] = row[jr.colIdxMap[columnID]]
}
key, err := sqlbase.MakeKeyFromEncDatums(
jr.primaryColumnTypes, values, &jr.desc, &jr.desc.PrimaryIndex, jr.primaryKeyPrefix,
&jr.alloc)
if err != nil {
return nil, err
}
keyToInputRowIdx[key.String()] = rowIdx
spans[rowIdx] = roachpb.Span{key, key.PrefixEnd()}
}
spans := []roachpb.Span{{Key: key, EndKey: key.PrefixEnd()}}
err = jr.primaryFetcher.StartScan(

// Perform the primary index scan.
err := jr.primaryFetcher.StartScan(
ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */)
if err != nil {
log.Errorf(ctx, "scan error: %s", err)
return nil, err
}
row, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
// Assert that the fetcher returns exactly one row.
if row == nil {
return nil, errors.New("expected exactly one row but found none")

// Iterate over the fetched rows and map them onto the input rows so we can
// return them in the same order.
for i := 0; i < batchSize; i++ {
key := jr.primaryFetcher.IndexKeyString(numKeyCols)
rowIdx, ok := keyToInputRowIdx[key]
if !ok {
return nil, errors.Errorf("failed to find key %v in keyToInputRowIdx %v", key, keyToInputRowIdx)
}
row, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
if row == nil {
return nil, errors.Errorf("expected %d rows but found %d", batchSize, i)
}
outRows[rowIdx] = jr.rowAlloc.CopyRow(row)
}

// Verify that we consumed all the fetched rows.
nextRow, _, _, err := jr.primaryFetcher.NextRow(ctx)
if err != nil {
return nil, err
}
if nextRow != nil {
return nil, errors.New("expected exactly one row but found multiple")
return nil, errors.Errorf("expected %d rows but found more", batchSize)
}
return row, nil

return outRows, nil
}

// Run is part of the processor interface.
Expand Down

0 comments on commit 1533557

Please sign in to comment.