diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index a8e739ae037d..641c628351e7 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -53,8 +53,9 @@ type joinReader struct { index *sqlbase.IndexDescriptor colIdxMap map[sqlbase.ColumnID]int - fetcher sqlbase.RowFetcher - alloc sqlbase.DatumAlloc + fetcher sqlbase.RowFetcher + alloc sqlbase.DatumAlloc + rowAlloc sqlbase.EncDatumRowAlloc input RowSource inputTypes []sqlbase.ColumnType @@ -297,7 +298,6 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { defer log.Infof(ctx, "exiting") } - spanToRowIndices := make(map[string][]int) for { // TODO(radu): figure out how to send smaller batches if the source has // a soft limit (perhaps send the batch out if we don't get a result @@ -305,6 +305,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { rowIdx := 0 rows = rows[:0] spans = spans[:0] + spanToRowIndices := make(map[string][]int, joinReaderBatchSize) for len(spans) < joinReaderBatchSize { row, meta := jr.input.Next() if meta != nil { @@ -350,7 +351,14 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { // TODO(radu): we are consuming all results from a fetch before starting // the next batch. We could start the next batch early while we are // outputting rows. - if earlyExit, err := jr.indexLookup(ctx, txn, spans, rows, spanToRowIndices); err != nil { + var earlyExit bool + var err error + if jr.isLookupJoin() { + earlyExit, err = jr.lookupJoinLookup(ctx, txn, spans, rows, spanToRowIndices) + } else { + earlyExit, err = jr.indexJoinLookup(ctx, txn, spans) + } + if err != nil { return err } else if earlyExit { return nil @@ -371,18 +379,55 @@ func (jr *joinReader) isLookupJoin() bool { return len(jr.lookupCols) > 0 } -// Index lookup iterates through all matches of the given spans and emits the -// corresponding row. +// indexJoinLookup performs an index lookup for the purposes of an index join. +// It fetches the specified spans from the primary index and emits the results. // -// For lookup joins, the `rows` and `spanToRowIndices` arguments must be -// provided. `rows` is the input rows which correspond to the specified spans. -// `spanToRowIndices` maps span keys onto the indices of the corresponding rows -// in the `rows` slice. For index joins, these arguments are ignored. +// Returns false if more rows need to be produced, true otherwise. If true is +// returned, both the inputs and the output have been drained and closed, except +// if an error is returned. +func (jr *joinReader) indexJoinLookup( + ctx context.Context, txn *client.Txn, spans roachpb.Spans, +) (bool, error) { + // TODO(radu,andrei,knz): set the traceKV flag when requested by the session. + err := jr.fetcher.StartScan( + ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */) + if err != nil { + log.Errorf(ctx, "scan error: %s", err) + return true, err + } + for { + row, _, _, err := jr.fetcher.NextRow(ctx) + if err != nil { + err = scrub.UnwrapScrubError(err) + return true, err + } + if row == nil { + // Done with this batch. + break + } + // Emit the row; stop if no more rows are needed. + if !emitHelper(ctx, &jr.out, row, nil /* meta */, jr.pushTrailingMeta, jr.input) { + return true, nil + } + } + return false, nil +} + +// lookupJoinLookup performs an index lookup for the purposes of a lookup join. +// `spans` is the set of spans which should be fetched from the index. `rows` is +// the corresponding input rows. `spanToRowIndices` maps span keys onto the +// corresponding `rows` indices. +// +// Note that if jr.primaryFetcher is non-nil, this function will perform two +// lookups: one to fetch rows from a secondary index, and a second to fetch the +// corresponding primary rows. This is for lookup joins on a secondary index +// which does not cover all the needed output columns. (Due to batching it may +// actually perform multiple primary lookups.) // // Returns false if more rows need to be produced, true otherwise. If true is // returned, both the inputs and the output have been drained and closed, except // if an error is returned. -func (jr *joinReader) indexLookup( +func (jr *joinReader) lookupJoinLookup( ctx context.Context, txn *client.Txn, spans roachpb.Spans, @@ -391,13 +436,7 @@ func (jr *joinReader) indexLookup( ) (bool, error) { // TODO(radu,andrei,knz): set the traceKV flag when requested by the session. err := jr.fetcher.StartScan( - ctx, - txn, - spans, - false, /* no batch limits */ - 0, - false, /* traceKV */ - ) + ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */) if err != nil { log.Errorf(ctx, "scan error: %s", err) return true, err @@ -413,35 +452,54 @@ func (jr *joinReader) indexLookup( emitted = make([]bool, len(rows)) } + // lookupRow represents an index key and the corresponding row. + type lookupRow = struct { + key string + row sqlbase.EncDatumRow + } + lookupRows := make([]lookupRow, 0, joinReaderBatchSize) + + // The index scan may have returned more rows than len(spans), so process the + // results in batches. for { - indexKey := jr.fetcher.IndexKeyString(len(jr.lookupCols)) - indexRow, _, _, err := jr.fetcher.NextRow(ctx) - if err != nil { - err = scrub.UnwrapScrubError(err) - return true, err - } - if indexRow == nil { - // Done with this batch. - break + // Get the next batch of lookup results. + lookupRows = lookupRows[:0] + for len(lookupRows) < joinReaderBatchSize { + key := jr.fetcher.IndexKeyString(len(jr.lookupCols)) + row, _, _, err := jr.fetcher.NextRow(ctx) + if err != nil { + err = scrub.UnwrapScrubError(err) + return true, err + } + if row == nil { + // Done with this batch. + break + } + lookupRows = append(lookupRows, lookupRow{key, jr.rowAlloc.CopyRow(row)}) } - rowIndices := spanToRowIndices[indexKey] - - if !jr.isLookupJoin() { - // Emit the row; stop if no more rows are needed. - if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) { - return true, nil + if jr.primaryFetcher != nil { + // The lookup was on a non-covering secondary index, so we need to do a + // second lookup against the primary index and replace our previous + // results with the primary rows. + secondaryIndexRows := make([]sqlbase.EncDatumRow, len(lookupRows)) + for i := range lookupRows { + secondaryIndexRows[i] = lookupRows[i].row } - } else { - if jr.primaryFetcher != nil { - // TODO(solon): Batch these primary index scans. - indexRow, err = jr.primaryLookup(ctx, txn, indexRow) - if err != nil { - return false, err - } + rows, err := jr.primaryLookup(ctx, txn, secondaryIndexRows) + if err != nil { + return false, err + } + for i := range rows { + lookupRows[i].row = rows[i] } - for _, rowIdx := range rowIndices { - renderedRow, err := jr.render(rows[rowIdx], indexRow) + } + + // Iterate over the lookup results, map them to the input rows, and emit the + // rendered rows. + for _, lookupRow := range lookupRows { + for _, rowIdx := range spanToRowIndices[lookupRow.key] { + renderedRow, err := jr.render(rows[rowIdx], lookupRow.row) if err != nil { return false, err } @@ -460,6 +518,11 @@ func (jr *joinReader) indexLookup( } } } + + if len(lookupRows) < joinReaderBatchSize { + // This was the last batch. + break + } } if emitted != nil { @@ -478,44 +541,75 @@ func (jr *joinReader) indexLookup( return false, nil } -// primaryLookup looks up the corresponding primary index row, given a secondary -// index row. +// primaryLookup looks up the corresponding primary index rows, given a batch of +// secondary index rows. Since we expect a 1-1 correspondence between the input +// and output, it returns a slice of rows where each row corresponds to the +// input with the same slice index. func (jr *joinReader) primaryLookup( - ctx context.Context, txn *client.Txn, secondaryIndexRow sqlbase.EncDatumRow, -) (sqlbase.EncDatumRow, error) { - index := jr.desc.PrimaryIndex - keyValues := make(sqlbase.EncDatumRow, len(index.ColumnIDs)) - for i, columnID := range index.ColumnIDs { - keyValues[i] = secondaryIndexRow[jr.colIdxMap[columnID]] + ctx context.Context, txn *client.Txn, secondaryIndexRows []sqlbase.EncDatumRow, +) ([]sqlbase.EncDatumRow, error) { + batchSize := len(secondaryIndexRows) + if batchSize == 0 { + return secondaryIndexRows, nil } - key, err := sqlbase.MakeKeyFromEncDatums( - jr.primaryColumnTypes, keyValues, &jr.desc, &index, jr.primaryKeyPrefix, &jr.alloc) - if err != nil { - return nil, err + numKeyCols := len(jr.desc.PrimaryIndex.ColumnIDs) + // keyToInputRowIdx maps primary index keys to the input rows. + keyToInputRowIdx := make(map[string]int, batchSize) + outRows := make([]sqlbase.EncDatumRow, batchSize) + + // Build spans for the primary index lookup. + spans := make([]roachpb.Span, batchSize) + for rowIdx, row := range secondaryIndexRows { + values := make(sqlbase.EncDatumRow, numKeyCols) + for i, columnID := range jr.desc.PrimaryIndex.ColumnIDs { + values[i] = row[jr.colIdxMap[columnID]] + } + key, err := sqlbase.MakeKeyFromEncDatums( + jr.primaryColumnTypes, values, &jr.desc, &jr.desc.PrimaryIndex, jr.primaryKeyPrefix, + &jr.alloc) + if err != nil { + return nil, err + } + keyToInputRowIdx[key.String()] = rowIdx + spans[rowIdx] = roachpb.Span{key, key.PrefixEnd()} } - spans := []roachpb.Span{{Key: key, EndKey: key.PrefixEnd()}} - err = jr.primaryFetcher.StartScan( + + // Perform the primary index scan. + err := jr.primaryFetcher.StartScan( ctx, txn, spans, false /* limitBatches */, 0 /* limitHint */, false /* traceKV */) if err != nil { log.Errorf(ctx, "scan error: %s", err) return nil, err } - row, _, _, err := jr.primaryFetcher.NextRow(ctx) - if err != nil { - return nil, err - } - // Assert that the fetcher returns exactly one row. - if row == nil { - return nil, errors.New("expected exactly one row but found none") + + // Iterate over the fetched rows and map them onto the input rows so we can + // return them in the same order. + for i := 0; i < batchSize; i++ { + key := jr.primaryFetcher.IndexKeyString(numKeyCols) + rowIdx, ok := keyToInputRowIdx[key] + if !ok { + return nil, errors.Errorf("failed to find key %v in keyToInputRowIdx %v", key, keyToInputRowIdx) + } + row, _, _, err := jr.primaryFetcher.NextRow(ctx) + if err != nil { + return nil, err + } + if row == nil { + return nil, errors.Errorf("expected %d rows but found %d", batchSize, i) + } + outRows[rowIdx] = jr.rowAlloc.CopyRow(row) } + + // Verify that we consumed all the fetched rows. nextRow, _, _, err := jr.primaryFetcher.NextRow(ctx) if err != nil { return nil, err } if nextRow != nil { - return nil, errors.New("expected exactly one row but found multiple") + return nil, errors.Errorf("expected %d rows but found more", batchSize) } - return row, nil + + return outRows, nil } // Run is part of the processor interface.