From a7497b886636eb72857f88b5392160cfaaa2fe21 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 19 Jan 2021 16:55:52 -0800 Subject: [PATCH] rowexec: minor joinreader cleanup Release note: None --- pkg/sql/rowexec/joinreader.go | 68 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 99312d665404..3aaef5c05b52 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -90,8 +90,7 @@ type joinReader struct { shouldLimitBatches bool readerType joinReaderType - input execinfra.RowSource - inputTypes []*types.T + input execinfra.RowSource // Column indexes in the input stream specifying the columns which match with // the index columns. These are the equality columns of the join. lookupCols []uint32 @@ -171,6 +170,9 @@ func newJoinReader( if spec.Type != descpb.InnerJoin { return nil, errors.AssertionFailedf("only inner index joins are supported, %s requested", spec.Type) } + if !spec.OnExpr.Empty() { + return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins") + } } var lookupCols []uint32 @@ -190,9 +192,13 @@ func newJoinReader( desc: tabledesc.MakeImmutable(spec.Table), maintainOrdering: spec.MaintainOrdering, input: input, - inputTypes: input.OutputTypes(), lookupCols: lookupCols, outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow, + // If the lookup columns form a key, there is only one result per + // lookup, so the fetcher should parallelize the key lookups it + // performs. + shouldLimitBatches: !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType, + readerType: readerType, } if readerType != indexJoinReaderType { jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner} @@ -213,19 +219,14 @@ func newJoinReader( indexCols[i] = uint32(columnID) } - // If the lookup columns form a key, there is only one result per lookup, so the fetcher - // should parallelize the key lookups it performs. - jr.shouldLimitBatches = !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType - jr.readerType = readerType - // Add all requested system columns to the output. var sysColDescs []descpb.ColumnDescriptor if spec.HasSystemColumns { sysColDescs = colinfo.AllSystemColumnDescs - } - for i := range sysColDescs { - columnTypes = append(columnTypes, sysColDescs[i].Type) - jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len()) + for i := range sysColDescs { + columnTypes = append(columnTypes, sysColDescs[i].Type) + jr.colIdxMap.Set(sysColDescs[i].ID, jr.colIdxMap.Len()) + } } var leftTypes []*types.T @@ -276,26 +277,18 @@ func newJoinReader( collectingStats = true } - neededRightCols := jr.neededRightCols() - set, err := getIndexColSet(jr.index, jr.colIdxMap) - if err != nil { - return nil, err - } - if isSecondary && !neededRightCols.SubsetOf(set) { - return nil, errors.Errorf("joinreader index does not cover all columns") + rightCols := jr.neededRightCols() + if isSecondary { + set, err := getIndexColSet(jr.index, jr.colIdxMap) + if err != nil { + return nil, err + } + if !rightCols.SubsetOf(set) { + return nil, errors.Errorf("joinreader index does not cover all columns") + } } var fetcher row.Fetcher - var rightCols util.FastIntSet - switch readerType { - case indexJoinReaderType: - rightCols = jr.Out.NeededColumns() - case lookupJoinReaderType: - rightCols = neededRightCols - default: - return nil, errors.Errorf("unsupported joinReaderType") - } - _, _, err = initRowFetcher( flowCtx, &fetcher, &jr.desc, int(spec.IndexIdx), jr.colIdxMap, false, /* reverse */ rightCols, false /* isCheck */, jr.EvalCtx.Mon, &jr.alloc, spec.Visibility, spec.LockingStrength, @@ -420,22 +413,31 @@ func (jr *joinReader) Spilled() bool { func (jr *joinReader) neededRightCols() util.FastIntSet { neededCols := jr.Out.NeededColumns() + if jr.readerType == indexJoinReaderType { + // For index joins, all columns from the left side are not output, so no + // shift is needed. Also, onCond is always empty for index joins, so + // there is no need to iterate over it either. + return neededCols + } + // Get the columns from the right side of the join and shift them over by // the size of the left side so the right side starts at 0. + numInputTypes := len(jr.input.OutputTypes()) neededRightCols := util.MakeFastIntSet() var lastCol int - for i, ok := neededCols.Next(len(jr.inputTypes)); ok; i, ok = neededCols.Next(i + 1) { - lastCol = i - len(jr.inputTypes) + for i, ok := neededCols.Next(numInputTypes); ok; i, ok = neededCols.Next(i + 1) { + lastCol = i - numInputTypes neededRightCols.Add(lastCol) } if jr.outputGroupContinuationForLeftRow { - // The lastCol is the bool continuation column and not a right column. + // The lastCol is the bool continuation column and not a right + // column. neededRightCols.Remove(lastCol) } // Add columns needed by OnExpr. for _, v := range jr.onCond.Vars.GetIndexedVars() { - rightIdx := v.Idx - len(jr.inputTypes) + rightIdx := v.Idx - numInputTypes if rightIdx >= 0 { neededRightCols.Add(rightIdx) }