diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 35709db8b38c..9728adc9db64 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -109,6 +109,7 @@ var retiredSettings = map[string]struct{}{ "sql.telemetry.query_sampling.sample_rate": {}, "diagnostics.sql_stat_reset.interval": {}, "changefeed.mem.pushback_enabled": {}, + "sql.distsql.index_join_limit_hint.enabled": {}, // removed as of 22.1. "sql.defaults.drop_enum_value.enabled": {}, diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 4fa38c8c6956..5208070afdb2 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -762,7 +762,7 @@ func NewColOperator( ctx, getStreamingAllocator(ctx, args), colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc, streamerBudgetAcc, flowCtx, - inputs[0].Root, core.JoinReader, inputTypes, streamerDiskMonitor, + inputs[0].Root, core.JoinReader, post, inputTypes, streamerDiskMonitor, ) if err != nil { return r, err diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go b/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go index 7514686a8d94..739b37a44b94 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go @@ -112,14 +112,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -138,8 +140,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -168,14 +169,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -224,14 +223,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -279,14 +280,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case 32: @@ -331,14 +334,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case -1: @@ -384,14 +389,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -440,14 +447,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -496,14 +505,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -552,14 +563,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -578,8 +591,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -608,14 +620,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -664,14 +674,13 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Set(outStartIdx+i, val) } } + outStartIdx += toAppend } } } @@ -751,14 +760,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -777,8 +788,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -807,14 +817,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -863,14 +871,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -918,14 +928,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case 32: @@ -970,14 +982,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case -1: @@ -1023,14 +1037,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1079,14 +1095,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1135,14 +1153,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1191,14 +1211,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1217,8 +1239,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -1247,14 +1268,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -1303,14 +1322,13 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Set(outStartIdx+i, val) } } + outStartIdx += toAppend } } } diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go b/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go index e01a6da7316e..0f2dbf86c02c 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go @@ -87,8 +87,12 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { + // {{if .IsBytesLike}} + outCol.Copy(srcCol, outStartIdx, srcStartIdx) + // {{else}} val := srcCol.Get(srcStartIdx) outCol.Set(outStartIdx, val) + // {{end}} } outStartIdx++ bs.curSrcStartIdx++ @@ -102,6 +106,7 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in } else { srcStartIdx = bs.curSrcStartIdx } + // {{/* toAppend will always be positive. */}} toAppend := leftNumRepeats - bs.numRepeatsIdx if outStartIdx+toAppend > outputCapacity { // We don't have enough space to repeat the current @@ -120,14 +125,40 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + // {{if not .IsBytesLike}} + // {{if .Sliceable}} + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] + // {{end}} val := srcCol.Get(srcStartIdx) + // {{end}} for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + // {{if .IsBytesLike}} + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) + // {{else}} + // {{if .Sliceable}} + // {{/* + // For the sliceable types, we sliced outCol + // to start at outStartIdx, so we use index + // i directly. + // */}} + //gcassert:bce + outCol.Set(i, val) + // {{else}} + // {{/* + // For the non-sliceable types, outCol + // vector is the original one (i.e. without + // an adjustment), so we need to add + // outStartIdx to set the element at the + // correct index. + // */}} + outCol.Set(outStartIdx+i, val) + // {{end}} + // {{end}} } } + outStartIdx += toAppend } } // {{end}} diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index f4bae88f3c17..415cdcb4c65e 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -62,6 +62,10 @@ type ColIndexJoin struct { // and may not correspond to batch boundaries. startIdx int + // limitHintHelper is used in limiting batches of input rows in the presence + // of hard and soft limits. + limitHintHelper execinfra.LimitHintHelper + mem struct { // inputBatchSize tracks the size of the rows that have been used to // generate spans so far. This is used to prevent memory usage from growing @@ -176,7 +180,7 @@ func (s *ColIndexJoin) Next() coldata.Batch { for { switch s.state { case indexJoinConstructingSpans: - var rowCount int + var rowCount int64 var spans roachpb.Spans s.mem.inputBatchSize = 0 for s.next() { @@ -184,14 +188,27 @@ func (s *ColIndexJoin) Next() coldata.Batch { // reference to input tuples after span generation. So, we can discard // the input batch reference on each iteration. endIdx := s.findEndIndex(rowCount > 0) - rowCount += endIdx - s.startIdx + // If we have a limit hint, make sure we don't include more rows + // than needed. + if l := s.limitHintHelper.LimitHint(); l != 0 && rowCount+int64(endIdx-s.startIdx) > l { + endIdx = s.startIdx + int(l-rowCount) + } + rowCount += int64(endIdx - s.startIdx) s.spanAssembler.ConsumeBatch(s.batch, s.startIdx, endIdx) s.startIdx = endIdx + if l := s.limitHintHelper.LimitHint(); l != 0 && rowCount == l { + // Reached the limit hint. Note that rowCount cannot be + // larger than l because we chopped the former off above. + break + } if endIdx < s.batch.Length() { // Reached the memory limit. break } } + if err := s.limitHintHelper.ReadSomeRows(rowCount); err != nil { + colexecerror.InternalError(err) + } spans = s.spanAssembler.GetSpans() if len(spans) == 0 { // No lookups left to perform. @@ -441,6 +458,7 @@ func NewColIndexJoin( flowCtx *execinfra.FlowCtx, input colexecop.Operator, spec *execinfrapb.JoinReaderSpec, + post *execinfrapb.PostProcessSpec, inputTypes []*types.T, diskMonitor *mon.BytesMonitor, ) (*ColIndexJoin, error) { @@ -508,6 +526,7 @@ func NewColIndexJoin( ResultTypes: tableArgs.typs, maintainOrdering: spec.MaintainOrdering, usesStreamer: useStreamer, + limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), } op.mem.inputBatchSizeLimit = inputBatchSizeLimit op.prepareMemLimit(inputTypes) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index a92376b4d0a3..9a3378ec40fc 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2233,6 +2233,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin( LockingStrength: n.table.lockingStrength, LockingWaitPolicy: n.table.lockingWaitPolicy, MaintainOrdering: len(n.reqOrdering) > 0, + LimitHint: int64(n.limitHint), } fetchColIDs := make([]descpb.ColumnID, len(n.cols)) @@ -2301,6 +2302,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner, OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner, LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit, + LimitHint: int64(n.limitHint), } fetchColIDs := make([]descpb.ColumnID, len(n.table.cols)) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4c93bee3375b..49bd7023efb0 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -640,6 +640,7 @@ func (e *distSQLSpecExecFactory) ConstructIndexJoin( keyCols []exec.NodeColumnOrdinal, tableCols exec.TableColumnOrdinalSet, reqOrdering exec.OutputOrdering, + limitHint int, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: index join") } @@ -659,6 +660,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin( isSecondJoinInPairedJoiner bool, reqOrdering exec.OutputOrdering, locking *tree.LockingItem, + limitHint int, ) (exec.Node, error) { // TODO (rohany): Implement production of system columns by the underlying scan here. return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: lookup join") diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index 12a010c014eb..4d944b5b6952 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) // We ignore any limits that are higher than this value to avoid integer @@ -124,3 +125,77 @@ func (s *SpansWithCopy) Reset() { s.Spans = nil s.SpansCopy = s.SpansCopy[:0] } + +// limitHintBatchCount tracks how many times the caller has read LimitHint() +// number of rows. +type limitHintBatchCount int + +const ( + limitHintFirstBatch limitHintBatchCount = iota + limitHintSecondBatch + limitHintDisabled +) + +// limitHintSecondBatchFactor is a multiple used when determining the limit hint +// for the second batch of rows. This will be used when the original limit hint +// turned out to be insufficient to satisfy the query. +const limitHintSecondBatchFactor = 10 + +// LimitHintHelper is used for lookup and index joins in order to limit batches +// of input rows in the presence of hard and soft limits. +type LimitHintHelper struct { + origLimitHint int64 + // currentLimitHint of zero indicates that the limit hint is disabled. + currentLimitHint int64 + limitHintIdx limitHintBatchCount +} + +// MakeLimitHintHelper creates a new LimitHintHelper. +func MakeLimitHintHelper(specLimitHint int64, post *execinfrapb.PostProcessSpec) LimitHintHelper { + limitHint := LimitHint(specLimitHint, post) + return LimitHintHelper{ + origLimitHint: limitHint, + currentLimitHint: limitHint, + limitHintIdx: limitHintFirstBatch, + } +} + +// LimitHint returns the current guess on the remaining rows that need to be +// read. Zero is returned when the limit hint is disabled. +func (h *LimitHintHelper) LimitHint() int64 { + return h.currentLimitHint +} + +// ReadSomeRows notifies the helper that its user has fetched the specified +// number of rows. An error is returned when the user fetched more rows than the +// current limit hint. +func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error { + if h.currentLimitHint != 0 { + h.currentLimitHint -= rowsRead + if h.currentLimitHint == 0 { + // Set up the limit hint for the next batch of input rows if the + // current batch turns out to be insufficient. + // + // If we just finished the first batch of rows, then use the + // original limit hint times limitHintSecondBatchFactor. If we + // finished the second or any of the following batches, then we keep + // the limit hint as zero (i.e. disabled) since it appears that our + // original hint was either way off or many input rows result in + // lookup misses. + switch h.limitHintIdx { + case limitHintFirstBatch: + h.currentLimitHint = limitHintSecondBatchFactor * h.origLimitHint + h.limitHintIdx = limitHintSecondBatch + default: + h.currentLimitHint = 0 + h.limitHintIdx = limitHintDisabled + } + } else if h.currentLimitHint < 0 { + return errors.AssertionFailedf( + "unexpectedly the user of LimitHintHelper read " + + "more rows that the current limit hint", + ) + } + } + return nil +} diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index f4c3ff32410b..c05d224c9057 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -379,6 +379,15 @@ message JoinReaderSpec { // used for lookups - it depends on whether the joiner decides it wants // DistSender-parallelism or not. optional int64 lookup_batch_bytes_limit = 18 [(gogoproto.nullable) = false]; + + // A hint for how many rows the consumer of the join reader output might + // need. This is used to size the initial batches of input rows to try to + // avoid reading many more rows than needed by the processor receiving the + // output. + // + // Not used if there is a limit set in the PostProcessSpec of this processor + // (that value will be used for sizing batches instead). + optional int64 limit_hint = 21 [(gogoproto.nullable) = false]; } // SorterSpec is the specification for a "sorting aggregator". A sorting diff --git a/pkg/sql/index_join.go b/pkg/sql/index_join.go index 0742a6a0a952..6d5b2c9baee5 100644 --- a/pkg/sql/index_join.go +++ b/pkg/sql/index_join.go @@ -36,6 +36,8 @@ type indexJoinNode struct { resultColumns colinfo.ResultColumns reqOrdering ReqOrdering + + limitHint int } func (n *indexJoinNode) startExec(params runParams) error { diff --git a/pkg/sql/lookup_join.go b/pkg/sql/lookup_join.go index d91dc01af15a..1358232cdd9f 100644 --- a/pkg/sql/lookup_join.go +++ b/pkg/sql/lookup_join.go @@ -70,6 +70,8 @@ type lookupJoinNode struct { isSecondJoinInPairedJoiner bool reqOrdering ReqOrdering + + limitHint int } func (lj *lookupJoinNode) startExec(params runParams) error { diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index de7331df408d..3fb705274551 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -1723,7 +1723,7 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) { needed, output := b.getColumns(cols, join.Table) res := execPlan{outputCols: output} res.root, err = b.factory.ConstructIndexJoin( - input.root, tab, keyCols, needed, res.reqOrdering(join), + input.root, tab, keyCols, needed, res.reqOrdering(join), int(math.Ceil(join.RequiredPhysical().LimitHint)), ) if err != nil { return execPlan{}, err @@ -1832,6 +1832,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { join.IsSecondJoinInPairedJoiner, res.reqOrdering(join), locking, + int(math.Ceil(join.RequiredPhysical().LimitHint)), ) if err != nil { return execPlan{}, err diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit new file mode 100644 index 000000000000..026acd99953b --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit @@ -0,0 +1,346 @@ +# LogicTest: local + +# This test file verifies that the lookup and index joins don't fetch too many +# rows eagerly in the presence of limit hints. + +statement ok +CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y)); +CREATE TABLE b (x INT PRIMARY KEY); +INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2); +INSERT INTO b VALUES (1), (2), (3), (4); + +# Query with an index join and a limit hint. +query T +EXPLAIN (OPT, VERBOSE) SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +limit + ├── columns: x:11 y:12 z:13 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 152.280001 + ├── key: () + ├── fd: ()-->(11-13) + ├── distribution: test + ├── prune: (11,13) + ├── union-all + │ ├── columns: x:11 y:12 z:13 + │ ├── left columns: a.x:1 a.y:2 a.z:3 + │ ├── right columns: a.x:6 a.y:7 a.z:8 + │ ├── stats: [rows=20] + │ ├── cost: 152.260001 + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── prune: (11,13) + │ ├── index-join a + │ │ ├── columns: a.x:1 a.y:2 a.z:3 + │ │ ├── stats: [rows=10, distinct(2)=1, null(2)=0, avgsize(2)=4] + │ │ ├── cost: 76.0200006 + │ │ ├── key: (1) + │ │ ├── fd: ()-->(2), (1)-->(3) + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1,3) + │ │ └── scan a@a_y_idx + │ │ ├── columns: a.x:1 a.y:2 + │ │ ├── constraint: /2/1: [/1 - /1] + │ │ ├── stats: [rows=10, distinct(2)=1, null(2)=0, avgsize(2)=4] + │ │ ├── cost: 15.1 + │ │ ├── key: (1) + │ │ ├── fd: ()-->(2) + │ │ ├── limit hint: 1.00 + │ │ └── distribution: test + │ └── index-join a + │ ├── columns: a.x:6 a.y:7 a.z:8 + │ ├── stats: [rows=10, distinct(7)=1, null(7)=0, avgsize(7)=4] + │ ├── cost: 76.0200006 + │ ├── key: (6) + │ ├── fd: ()-->(7), (6)-->(8) + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── prune: (6,8) + │ └── scan a@a_y_idx + │ ├── columns: a.x:6 a.y:7 + │ ├── constraint: /7/6: [/2 - /2] + │ ├── stats: [rows=10, distinct(7)=1, null(7)=0, avgsize(7)=4] + │ ├── cost: 15.1 + │ ├── key: (6) + │ ├── fd: ()-->(7) + │ ├── limit hint: 1.00 + │ └── distribution: test + └── 1 + +# Run through the vectorized engine. Make sure that only a single row is scanned +# and then a single row is looked up by the index join. +query T +EXPLAIN ANALYZE SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ count: 1 +│ +└── • union all + │ nodes: + │ regions: + │ actual row count: 1 + │ + ├── • index join + │ │ nodes: + │ │ regions: + │ │ actual row count: 1 + │ │ KV time: 0µs + │ │ KV contention time: 0µs + │ │ KV rows read: 1 + │ │ KV bytes read: 8 B + │ │ estimated max memory allocated: 0 B + │ │ estimated max sql temp disk usage: 0 B + │ │ table: a@a_pkey + │ │ + │ └── • scan + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ estimated max memory allocated: 0 B + │ missing stats + │ table: a@a_y_idx + │ spans: [/1 - /1] + │ + └── • index join + │ nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 0 + │ KV bytes read: 0 B + │ estimated max memory allocated: 0 B + │ estimated max sql temp disk usage: 0 B + │ table: a@a_pkey + │ + └── • scan + nodes: + regions: + actual row count: 0 + KV time: 0µs + KV contention time: 0µs + KV rows read: 0 + KV bytes read: 0 B + estimated max memory allocated: 0 B + missing stats + table: a@a_y_idx + spans: [/2 - /2] + +statement ok +SET vectorize = off + +# Run through the row-by-row engine. Make sure that only a single row is scanned +# and then a single row is looked up by the index join. +query T +EXPLAIN ANALYZE SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ count: 1 +│ +└── • union all + │ nodes: + │ regions: + │ actual row count: 1 + │ + ├── • index join + │ │ nodes: + │ │ regions: + │ │ actual row count: 1 + │ │ KV time: 0µs + │ │ KV contention time: 0µs + │ │ KV rows read: 1 + │ │ KV bytes read: 8 B + │ │ table: a@a_pkey + │ │ + │ └── • scan + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ missing stats + │ table: a@a_y_idx + │ spans: [/1 - /1] + │ + └── • index join + │ nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 0 + │ KV bytes read: 0 B + │ table: a@a_pkey + │ + └── • scan + nodes: + regions: + actual row count: 0 + KV time: 0µs + KV contention time: 0µs + KV rows read: 0 + KV bytes read: 0 B + missing stats + table: a@a_y_idx + spans: [/2 - /2] + +statement ok +RESET vectorize + +# Inject such stats that the query below will have a limit hint of 1 for the +# scan. +statement ok +ALTER TABLE a INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": ["x"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + }, + { + "avg_size": 1, + "columns": ["y"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + }, + { + "avg_size": 1, + "columns": ["z"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + } + ]' + +# Query with a lookup join and a limit hint. +query T +EXPLAIN (OPT, VERBOSE) SELECT b.x FROM a, b WHERE a.x = b.x LIMIT 1 +---- +project + ├── columns: x:6 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 21.145 + ├── key: () + ├── fd: ()-->(6) + ├── distribution: test + ├── prune: (6) + └── limit + ├── columns: a.x:1 b.x:6 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 21.125 + ├── key: () + ├── fd: ()-->(1,6), (6)==(1), (1)==(6) + ├── distribution: test + ├── inner-join (lookup b) + │ ├── columns: a.x:1 b.x:6 + │ ├── key columns: [1] = [6] + │ ├── lookup columns are key + │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4] + │ ├── cost: 21.105 + │ ├── key: (6) + │ ├── fd: (1)==(6), (6)==(1) + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── scan a@a_y_idx + │ │ ├── columns: a.x:1 + │ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1] + │ │ ├── cost: 15.035 + │ │ ├── key: (1) + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1) + │ │ ├── interesting orderings: (+1) + │ │ └── unfiltered-cols: (1-5) + │ └── filters (true) + └── 1 + +# Perform a lookup join. Make sure that a single row is scanned and then a +# single row is looked up. +query T +EXPLAIN ANALYZE SELECT b.x FROM a, b WHERE a.x = b.x LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ KV time: 0µs +│ KV contention time: 0µs +│ KV rows read: 1 +│ KV bytes read: 8 B +│ count: 1 +│ +└── • lookup join + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ table: b@b_pkey + │ equality: (x) = (x) + │ equality cols are key + │ + └── • scan + nodes: + regions: + actual row count: 1 + KV time: 0µs + KV contention time: 0µs + KV rows read: 1 + KV bytes read: 8 B + estimated max memory allocated: 0 B + estimated row count: 1 (100% of the table; stats collected ago) + table: a@a_y_idx + spans: FULL SCAN diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index fae22c98da61..f8dde910e2c7 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -249,6 +249,7 @@ define IndexJoin { KeyCols []exec.NodeColumnOrdinal TableCols exec.TableColumnOrdinalSet ReqOrdering exec.OutputOrdering + LimitHint int } # LookupJoin performs a lookup join. @@ -284,6 +285,7 @@ define LookupJoin { IsSecondJoinInPairedJoiner bool ReqOrdering exec.OutputOrdering Locking *tree.LockingItem + LimitHint int } # InvertedJoin performs a lookup join into an inverted index. diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 9cb39e28c53e..a65b2dd63f9a 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -597,6 +597,7 @@ func (ef *execFactory) ConstructIndexJoin( keyCols []exec.NodeColumnOrdinal, tableCols exec.TableColumnOrdinalSet, reqOrdering exec.OutputOrdering, + limitHint int, ) (exec.Node, error) { tabDesc := table.(*optTable).desc colCfg := makeScanColumnsConfig(table, tableCols) @@ -618,6 +619,7 @@ func (ef *execFactory) ConstructIndexJoin( cols: cols, resultColumns: colinfo.ResultColumnsFromColumns(tabDesc.GetID(), cols), reqOrdering: ReqOrdering(reqOrdering), + limitHint: limitHint, } n.keyCols = make([]int, len(keyCols)) @@ -644,6 +646,7 @@ func (ef *execFactory) ConstructLookupJoin( isSecondJoinInPairedJoiner bool, reqOrdering exec.OutputOrdering, locking *tree.LockingItem, + limitHint int, ) (exec.Node, error) { if table.IsVirtualTable() { return ef.constructVirtualTableLookupJoin(joinType, input, table, index, eqCols, lookupCols, onCond) @@ -680,6 +683,7 @@ func (ef *execFactory) ConstructLookupJoin( isFirstJoinInPairedJoiner: isFirstJoinInPairedJoiner, isSecondJoinInPairedJoiner: isSecondJoinInPairedJoiner, reqOrdering: ReqOrdering(reqOrdering), + limitHint: limitHint, } n.eqCols = make([]int, len(eqCols)) for i, c := range eqCols { diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 63f3ccacc54b..77d8dace4394 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -217,6 +217,10 @@ type joinReader struct { // used. lookupBatchBytesLimit rowinfra.BytesLimit + // limitHintHelper is used in limiting batches of input rows in the presence + // of hard and soft limits. + limitHintHelper execinfra.LimitHintHelper + // scanStats is collected from the trace after we finish doing work for this // join. scanStats execinfra.ScanStats @@ -316,6 +320,7 @@ func newJoinReader( lockWaitPolicy: row.GetWaitPolicy(spec.LockingWaitPolicy), usesStreamer: useStreamer, lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit), + limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), } if readerType != indexJoinReaderType { jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner} @@ -751,6 +756,10 @@ func (jr *joinReader) readInput() ( return jrStateUnknown, nil, jr.DrainHelper() } jr.scratchInputRows = append(jr.scratchInputRows, jr.rowAlloc.CopyRow(encDatumRow)) + + if l := jr.limitHintHelper.LimitHint(); l != 0 && l == int64(len(jr.scratchInputRows)) { + break + } } if err := jr.performMemoryAccounting(); err != nil { @@ -780,6 +789,11 @@ func (jr *joinReader) readInput() ( jr.updateGroupingStateForNonEmptyBatch() } + if err := jr.limitHintHelper.ReadSomeRows(int64(len(jr.scratchInputRows))); err != nil { + jr.MoveToDraining(err) + return jrStateUnknown, nil, jr.DrainHelper() + } + // Figure out what key spans we need to lookup. spans, err := jr.strategy.processLookupRows(jr.scratchInputRows) if err != nil {