Skip to content

Commit

Permalink
rowexec: minor joinreader cleanup
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Jan 20, 2021
1 parent 7ee466f commit a7497b8
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ type joinReader struct {
shouldLimitBatches bool
readerType joinReaderType

input execinfra.RowSource
inputTypes []*types.T
input execinfra.RowSource
// Column indexes in the input stream specifying the columns which match with
// the index columns. These are the equality columns of the join.
lookupCols []uint32
Expand Down Expand Up @@ -171,6 +170,9 @@ func newJoinReader(
if spec.Type != descpb.InnerJoin {
return nil, errors.AssertionFailedf("only inner index joins are supported, %s requested", spec.Type)
}
if !spec.OnExpr.Empty() {
return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins")
}
}

var lookupCols []uint32
Expand All @@ -190,9 +192,13 @@ func newJoinReader(
desc: tabledesc.MakeImmutable(spec.Table),
maintainOrdering: spec.MaintainOrdering,
input: input,
inputTypes: input.OutputTypes(),
lookupCols: lookupCols,
outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow,
// If the lookup columns form a key, there is only one result per
// lookup, so the fetcher should parallelize the key lookups it
// performs.
shouldLimitBatches: !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType,
readerType: readerType,
}
if readerType != indexJoinReaderType {
jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner}
Expand All @@ -213,19 +219,14 @@ func newJoinReader(
indexCols[i] = uint32(columnID)
}

// If the lookup columns form a key, there is only one result per lookup, so the fetcher
// should parallelize the key lookups it performs.
jr.shouldLimitBatches = !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType
jr.readerType = readerType

// Add all requested system columns to the output.
var sysColDescs []descpb.ColumnDescriptor
if spec.HasSystemColumns {
sysColDescs = colinfo.AllSystemColumnDescs
}
for i := range sysColDescs {
columnTypes = append(columnTypes, sysColDescs[i].Type)
jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len())
for i := range sysColDescs {
columnTypes = append(columnTypes, sysColDescs[i].Type)
jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len())
}
}

var leftTypes []*types.T
Expand Down Expand Up @@ -276,26 +277,18 @@ func newJoinReader(
collectingStats = true
}

neededRightCols := jr.neededRightCols()
set, err := getIndexColSet(jr.index, jr.colIdxMap)
if err != nil {
return nil, err
}
if isSecondary && !neededRightCols.SubsetOf(set) {
return nil, errors.Errorf("joinreader index does not cover all columns")
rightCols := jr.neededRightCols()
if isSecondary {
set, err := getIndexColSet(jr.index, jr.colIdxMap)
if err != nil {
return nil, err
}
if !rightCols.SubsetOf(set) {
return nil, errors.Errorf("joinreader index does not cover all columns")
}
}

var fetcher row.Fetcher
var rightCols util.FastIntSet
switch readerType {
case indexJoinReaderType:
rightCols = jr.Out.NeededColumns()
case lookupJoinReaderType:
rightCols = neededRightCols
default:
return nil, errors.Errorf("unsupported joinReaderType")
}

_, _, err = initRowFetcher(
flowCtx, &fetcher, &jr.desc, int(spec.IndexIdx), jr.colIdxMap, false, /* reverse */
rightCols, false /* isCheck */, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength,
Expand Down Expand Up @@ -420,22 +413,31 @@ func (jr *joinReader) Spilled() bool {
func (jr *joinReader) neededRightCols() util.FastIntSet {
neededCols := jr.Out.NeededColumns()

if jr.readerType == indexJoinReaderType {
// For index joins, all columns from the left side are not output, so no
// shift is needed. Also, onCond is always empty for index joins, so
// there is no need to iterate over it either.
return neededCols
}

// Get the columns from the right side of the join and shift them over by
// the size of the left side so the right side starts at 0.
numInputTypes := len(jr.input.OutputTypes())
neededRightCols := util.MakeFastIntSet()
var lastCol int
for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) {
lastCol = i - len(jr.inputTypes)
for i, ok := neededCols.Next(numInputTypes); ok; i, ok = neededCols.Next(i + 1) {
lastCol = i - numInputTypes
neededRightCols.Add(lastCol)
}
if jr.outputGroupContinuationForLeftRow {
// The lastCol is the bool continuation column and not a right column.
// The lastCol is the bool continuation column and not a right
// column.
neededRightCols.Remove(lastCol)
}

// Add columns needed by OnExpr.
for _, v := range jr.onCond.Vars.GetIndexedVars() {
rightIdx := v.Idx - len(jr.inputTypes)
rightIdx := v.Idx - numInputTypes
if rightIdx >= 0 {
neededRightCols.Add(rightIdx)
}
Expand Down

0 comments on commit a7497b8

Please sign in to comment.