From b69ad23d37ba3eb01ad795d09c7482d1a4aaeed9 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 11 Mar 2022 14:19:08 -0800 Subject: [PATCH 1/2] sql: use soft and hard limits in lookup and index joins Previously, during the execution of lookup and index joins we completely ignored the soft and hard limits and, instead, always fetched up to the memory-based limit (which depends on the type of the join). This could lead to having to read many more rows from the input and then looking up many more rows from the KV layer than necessary. This is now fixed by plumbing the soft and hard limits from the optimizer and using them when sizing the batches of input rows to perform lookup for. For index joins we know for sure that every input row will get a looked up row; however, for lookup joins an input row might result in a miss. To work around this we use a simple heuristic for determining the limit-hint-based size of input batches: for the first batch we use the limit hint as is, for the second batch we use 10x of the original hint, and for the third and all consequent batches we disable the limiting behavior altogether. Release note (bug fix): CockroachDB might now fetch less rows when performing lookup and index joins on the queries with LIMIT clause. --- pkg/settings/registry.go | 1 + pkg/sql/colexec/colbuilder/execplan.go | 2 +- pkg/sql/colfetcher/index_join.go | 23 +- pkg/sql/distsql_physical_planner.go | 2 + pkg/sql/distsql_spec_exec_factory.go | 2 + pkg/sql/execinfra/readerbase.go | 75 ++++ pkg/sql/execinfrapb/processors_sql.proto | 9 + pkg/sql/index_join.go | 2 + pkg/sql/lookup_join.go | 2 + pkg/sql/opt/exec/execbuilder/relational.go | 3 +- .../execbuilder/testdata/lookup_join_limit | 346 ++++++++++++++++++ pkg/sql/opt/exec/factory.opt | 2 + pkg/sql/opt_exec_factory.go | 4 + pkg/sql/rowexec/joinreader.go | 14 + 14 files changed, 483 insertions(+), 4 deletions(-) create mode 100644 pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 35709db8b38c..9728adc9db64 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -109,6 +109,7 @@ var retiredSettings = map[string]struct{}{ "sql.telemetry.query_sampling.sample_rate": {}, "diagnostics.sql_stat_reset.interval": {}, "changefeed.mem.pushback_enabled": {}, + "sql.distsql.index_join_limit_hint.enabled": {}, // removed as of 22.1. "sql.defaults.drop_enum_value.enabled": {}, diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 4fa38c8c6956..5208070afdb2 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -762,7 +762,7 @@ func NewColOperator( ctx, getStreamingAllocator(ctx, args), colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc, streamerBudgetAcc, flowCtx, - inputs[0].Root, core.JoinReader, inputTypes, streamerDiskMonitor, + inputs[0].Root, core.JoinReader, post, inputTypes, streamerDiskMonitor, ) if err != nil { return r, err diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index f4bae88f3c17..415cdcb4c65e 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -62,6 +62,10 @@ type ColIndexJoin struct { // and may not correspond to batch boundaries. startIdx int + // limitHintHelper is used in limiting batches of input rows in the presence + // of hard and soft limits. + limitHintHelper execinfra.LimitHintHelper + mem struct { // inputBatchSize tracks the size of the rows that have been used to // generate spans so far. This is used to prevent memory usage from growing @@ -176,7 +180,7 @@ func (s *ColIndexJoin) Next() coldata.Batch { for { switch s.state { case indexJoinConstructingSpans: - var rowCount int + var rowCount int64 var spans roachpb.Spans s.mem.inputBatchSize = 0 for s.next() { @@ -184,14 +188,27 @@ func (s *ColIndexJoin) Next() coldata.Batch { // reference to input tuples after span generation. So, we can discard // the input batch reference on each iteration. endIdx := s.findEndIndex(rowCount > 0) - rowCount += endIdx - s.startIdx + // If we have a limit hint, make sure we don't include more rows + // than needed. + if l := s.limitHintHelper.LimitHint(); l != 0 && rowCount+int64(endIdx-s.startIdx) > l { + endIdx = s.startIdx + int(l-rowCount) + } + rowCount += int64(endIdx - s.startIdx) s.spanAssembler.ConsumeBatch(s.batch, s.startIdx, endIdx) s.startIdx = endIdx + if l := s.limitHintHelper.LimitHint(); l != 0 && rowCount == l { + // Reached the limit hint. Note that rowCount cannot be + // larger than l because we chopped the former off above. + break + } if endIdx < s.batch.Length() { // Reached the memory limit. break } } + if err := s.limitHintHelper.ReadSomeRows(rowCount); err != nil { + colexecerror.InternalError(err) + } spans = s.spanAssembler.GetSpans() if len(spans) == 0 { // No lookups left to perform. @@ -441,6 +458,7 @@ func NewColIndexJoin( flowCtx *execinfra.FlowCtx, input colexecop.Operator, spec *execinfrapb.JoinReaderSpec, + post *execinfrapb.PostProcessSpec, inputTypes []*types.T, diskMonitor *mon.BytesMonitor, ) (*ColIndexJoin, error) { @@ -508,6 +526,7 @@ func NewColIndexJoin( ResultTypes: tableArgs.typs, maintainOrdering: spec.MaintainOrdering, usesStreamer: useStreamer, + limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), } op.mem.inputBatchSizeLimit = inputBatchSizeLimit op.prepareMemLimit(inputTypes) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index a92376b4d0a3..9a3378ec40fc 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2233,6 +2233,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin( LockingStrength: n.table.lockingStrength, LockingWaitPolicy: n.table.lockingWaitPolicy, MaintainOrdering: len(n.reqOrdering) > 0, + LimitHint: int64(n.limitHint), } fetchColIDs := make([]descpb.ColumnID, len(n.cols)) @@ -2301,6 +2302,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin( LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner, OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner, LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit, + LimitHint: int64(n.limitHint), } fetchColIDs := make([]descpb.ColumnID, len(n.table.cols)) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 4c93bee3375b..49bd7023efb0 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -640,6 +640,7 @@ func (e *distSQLSpecExecFactory) ConstructIndexJoin( keyCols []exec.NodeColumnOrdinal, tableCols exec.TableColumnOrdinalSet, reqOrdering exec.OutputOrdering, + limitHint int, ) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: index join") } @@ -659,6 +660,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin( isSecondJoinInPairedJoiner bool, reqOrdering exec.OutputOrdering, locking *tree.LockingItem, + limitHint int, ) (exec.Node, error) { // TODO (rohany): Implement production of system columns by the underlying scan here. return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: lookup join") diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index 12a010c014eb..4d944b5b6952 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) // We ignore any limits that are higher than this value to avoid integer @@ -124,3 +125,77 @@ func (s *SpansWithCopy) Reset() { s.Spans = nil s.SpansCopy = s.SpansCopy[:0] } + +// limitHintBatchCount tracks how many times the caller has read LimitHint() +// number of rows. +type limitHintBatchCount int + +const ( + limitHintFirstBatch limitHintBatchCount = iota + limitHintSecondBatch + limitHintDisabled +) + +// limitHintSecondBatchFactor is a multiple used when determining the limit hint +// for the second batch of rows. This will be used when the original limit hint +// turned out to be insufficient to satisfy the query. +const limitHintSecondBatchFactor = 10 + +// LimitHintHelper is used for lookup and index joins in order to limit batches +// of input rows in the presence of hard and soft limits. +type LimitHintHelper struct { + origLimitHint int64 + // currentLimitHint of zero indicates that the limit hint is disabled. + currentLimitHint int64 + limitHintIdx limitHintBatchCount +} + +// MakeLimitHintHelper creates a new LimitHintHelper. +func MakeLimitHintHelper(specLimitHint int64, post *execinfrapb.PostProcessSpec) LimitHintHelper { + limitHint := LimitHint(specLimitHint, post) + return LimitHintHelper{ + origLimitHint: limitHint, + currentLimitHint: limitHint, + limitHintIdx: limitHintFirstBatch, + } +} + +// LimitHint returns the current guess on the remaining rows that need to be +// read. Zero is returned when the limit hint is disabled. +func (h *LimitHintHelper) LimitHint() int64 { + return h.currentLimitHint +} + +// ReadSomeRows notifies the helper that its user has fetched the specified +// number of rows. An error is returned when the user fetched more rows than the +// current limit hint. +func (h *LimitHintHelper) ReadSomeRows(rowsRead int64) error { + if h.currentLimitHint != 0 { + h.currentLimitHint -= rowsRead + if h.currentLimitHint == 0 { + // Set up the limit hint for the next batch of input rows if the + // current batch turns out to be insufficient. + // + // If we just finished the first batch of rows, then use the + // original limit hint times limitHintSecondBatchFactor. If we + // finished the second or any of the following batches, then we keep + // the limit hint as zero (i.e. disabled) since it appears that our + // original hint was either way off or many input rows result in + // lookup misses. + switch h.limitHintIdx { + case limitHintFirstBatch: + h.currentLimitHint = limitHintSecondBatchFactor * h.origLimitHint + h.limitHintIdx = limitHintSecondBatch + default: + h.currentLimitHint = 0 + h.limitHintIdx = limitHintDisabled + } + } else if h.currentLimitHint < 0 { + return errors.AssertionFailedf( + "unexpectedly the user of LimitHintHelper read " + + "more rows that the current limit hint", + ) + } + } + return nil +} diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index f4c3ff32410b..c05d224c9057 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -379,6 +379,15 @@ message JoinReaderSpec { // used for lookups - it depends on whether the joiner decides it wants // DistSender-parallelism or not. optional int64 lookup_batch_bytes_limit = 18 [(gogoproto.nullable) = false]; + + // A hint for how many rows the consumer of the join reader output might + // need. This is used to size the initial batches of input rows to try to + // avoid reading many more rows than needed by the processor receiving the + // output. + // + // Not used if there is a limit set in the PostProcessSpec of this processor + // (that value will be used for sizing batches instead). + optional int64 limit_hint = 21 [(gogoproto.nullable) = false]; } // SorterSpec is the specification for a "sorting aggregator". A sorting diff --git a/pkg/sql/index_join.go b/pkg/sql/index_join.go index 0742a6a0a952..6d5b2c9baee5 100644 --- a/pkg/sql/index_join.go +++ b/pkg/sql/index_join.go @@ -36,6 +36,8 @@ type indexJoinNode struct { resultColumns colinfo.ResultColumns reqOrdering ReqOrdering + + limitHint int } func (n *indexJoinNode) startExec(params runParams) error { diff --git a/pkg/sql/lookup_join.go b/pkg/sql/lookup_join.go index d91dc01af15a..1358232cdd9f 100644 --- a/pkg/sql/lookup_join.go +++ b/pkg/sql/lookup_join.go @@ -70,6 +70,8 @@ type lookupJoinNode struct { isSecondJoinInPairedJoiner bool reqOrdering ReqOrdering + + limitHint int } func (lj *lookupJoinNode) startExec(params runParams) error { diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index de7331df408d..3fb705274551 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -1723,7 +1723,7 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) { needed, output := b.getColumns(cols, join.Table) res := execPlan{outputCols: output} res.root, err = b.factory.ConstructIndexJoin( - input.root, tab, keyCols, needed, res.reqOrdering(join), + input.root, tab, keyCols, needed, res.reqOrdering(join), int(math.Ceil(join.RequiredPhysical().LimitHint)), ) if err != nil { return execPlan{}, err @@ -1832,6 +1832,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { join.IsSecondJoinInPairedJoiner, res.reqOrdering(join), locking, + int(math.Ceil(join.RequiredPhysical().LimitHint)), ) if err != nil { return execPlan{}, err diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit new file mode 100644 index 000000000000..026acd99953b --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit @@ -0,0 +1,346 @@ +# LogicTest: local + +# This test file verifies that the lookup and index joins don't fetch too many +# rows eagerly in the presence of limit hints. + +statement ok +CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y)); +CREATE TABLE b (x INT PRIMARY KEY); +INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2); +INSERT INTO b VALUES (1), (2), (3), (4); + +# Query with an index join and a limit hint. +query T +EXPLAIN (OPT, VERBOSE) SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +limit + ├── columns: x:11 y:12 z:13 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 152.280001 + ├── key: () + ├── fd: ()-->(11-13) + ├── distribution: test + ├── prune: (11,13) + ├── union-all + │ ├── columns: x:11 y:12 z:13 + │ ├── left columns: a.x:1 a.y:2 a.z:3 + │ ├── right columns: a.x:6 a.y:7 a.z:8 + │ ├── stats: [rows=20] + │ ├── cost: 152.260001 + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── prune: (11,13) + │ ├── index-join a + │ │ ├── columns: a.x:1 a.y:2 a.z:3 + │ │ ├── stats: [rows=10, distinct(2)=1, null(2)=0, avgsize(2)=4] + │ │ ├── cost: 76.0200006 + │ │ ├── key: (1) + │ │ ├── fd: ()-->(2), (1)-->(3) + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1,3) + │ │ └── scan a@a_y_idx + │ │ ├── columns: a.x:1 a.y:2 + │ │ ├── constraint: /2/1: [/1 - /1] + │ │ ├── stats: [rows=10, distinct(2)=1, null(2)=0, avgsize(2)=4] + │ │ ├── cost: 15.1 + │ │ ├── key: (1) + │ │ ├── fd: ()-->(2) + │ │ ├── limit hint: 1.00 + │ │ └── distribution: test + │ └── index-join a + │ ├── columns: a.x:6 a.y:7 a.z:8 + │ ├── stats: [rows=10, distinct(7)=1, null(7)=0, avgsize(7)=4] + │ ├── cost: 76.0200006 + │ ├── key: (6) + │ ├── fd: ()-->(7), (6)-->(8) + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── prune: (6,8) + │ └── scan a@a_y_idx + │ ├── columns: a.x:6 a.y:7 + │ ├── constraint: /7/6: [/2 - /2] + │ ├── stats: [rows=10, distinct(7)=1, null(7)=0, avgsize(7)=4] + │ ├── cost: 15.1 + │ ├── key: (6) + │ ├── fd: ()-->(7) + │ ├── limit hint: 1.00 + │ └── distribution: test + └── 1 + +# Run through the vectorized engine. Make sure that only a single row is scanned +# and then a single row is looked up by the index join. +query T +EXPLAIN ANALYZE SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ count: 1 +│ +└── • union all + │ nodes: + │ regions: + │ actual row count: 1 + │ + ├── • index join + │ │ nodes: + │ │ regions: + │ │ actual row count: 1 + │ │ KV time: 0µs + │ │ KV contention time: 0µs + │ │ KV rows read: 1 + │ │ KV bytes read: 8 B + │ │ estimated max memory allocated: 0 B + │ │ estimated max sql temp disk usage: 0 B + │ │ table: a@a_pkey + │ │ + │ └── • scan + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ estimated max memory allocated: 0 B + │ missing stats + │ table: a@a_y_idx + │ spans: [/1 - /1] + │ + └── • index join + │ nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 0 + │ KV bytes read: 0 B + │ estimated max memory allocated: 0 B + │ estimated max sql temp disk usage: 0 B + │ table: a@a_pkey + │ + └── • scan + nodes: + regions: + actual row count: 0 + KV time: 0µs + KV contention time: 0µs + KV rows read: 0 + KV bytes read: 0 B + estimated max memory allocated: 0 B + missing stats + table: a@a_y_idx + spans: [/2 - /2] + +statement ok +SET vectorize = off + +# Run through the row-by-row engine. Make sure that only a single row is scanned +# and then a single row is looked up by the index join. +query T +EXPLAIN ANALYZE SELECT * FROM (SELECT * FROM a WHERE y = 1 UNION ALL SELECT * FROM a WHERE y = 2) LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ count: 1 +│ +└── • union all + │ nodes: + │ regions: + │ actual row count: 1 + │ + ├── • index join + │ │ nodes: + │ │ regions: + │ │ actual row count: 1 + │ │ KV time: 0µs + │ │ KV contention time: 0µs + │ │ KV rows read: 1 + │ │ KV bytes read: 8 B + │ │ table: a@a_pkey + │ │ + │ └── • scan + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ missing stats + │ table: a@a_y_idx + │ spans: [/1 - /1] + │ + └── • index join + │ nodes: + │ regions: + │ actual row count: 0 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 0 + │ KV bytes read: 0 B + │ table: a@a_pkey + │ + └── • scan + nodes: + regions: + actual row count: 0 + KV time: 0µs + KV contention time: 0µs + KV rows read: 0 + KV bytes read: 0 B + missing stats + table: a@a_y_idx + spans: [/2 - /2] + +statement ok +RESET vectorize + +# Inject such stats that the query below will have a limit hint of 1 for the +# scan. +statement ok +ALTER TABLE a INJECT STATISTICS '[ + { + "avg_size": 1, + "columns": ["x"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + }, + { + "avg_size": 1, + "columns": ["y"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + }, + { + "avg_size": 1, + "columns": ["z"], + "created_at": "2022-03-22 00:00:00", + "distinct_count": 1, + "name": "__auto__", + "null_count": 0, + "row_count": 1 + } + ]' + +# Query with a lookup join and a limit hint. +query T +EXPLAIN (OPT, VERBOSE) SELECT b.x FROM a, b WHERE a.x = b.x LIMIT 1 +---- +project + ├── columns: x:6 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 21.145 + ├── key: () + ├── fd: ()-->(6) + ├── distribution: test + ├── prune: (6) + └── limit + ├── columns: a.x:1 b.x:6 + ├── cardinality: [0 - 1] + ├── stats: [rows=1] + ├── cost: 21.125 + ├── key: () + ├── fd: ()-->(1,6), (6)==(1), (1)==(6) + ├── distribution: test + ├── inner-join (lookup b) + │ ├── columns: a.x:1 b.x:6 + │ ├── key columns: [1] = [6] + │ ├── lookup columns are key + │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4] + │ ├── cost: 21.105 + │ ├── key: (6) + │ ├── fd: (1)==(6), (6)==(1) + │ ├── limit hint: 1.00 + │ ├── distribution: test + │ ├── scan a@a_y_idx + │ │ ├── columns: a.x:1 + │ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1] + │ │ ├── cost: 15.035 + │ │ ├── key: (1) + │ │ ├── limit hint: 1.00 + │ │ ├── distribution: test + │ │ ├── prune: (1) + │ │ ├── interesting orderings: (+1) + │ │ └── unfiltered-cols: (1-5) + │ └── filters (true) + └── 1 + +# Perform a lookup join. Make sure that a single row is scanned and then a +# single row is looked up. +query T +EXPLAIN ANALYZE SELECT b.x FROM a, b WHERE a.x = b.x LIMIT 1 +---- +planning time: 10µs +execution time: 100µs +distribution: +vectorized: +rows read from KV: 2 (16 B) +maximum memory usage: +network usage: +regions: +· +• limit +│ nodes: +│ regions: +│ actual row count: 1 +│ KV time: 0µs +│ KV contention time: 0µs +│ KV rows read: 1 +│ KV bytes read: 8 B +│ count: 1 +│ +└── • lookup join + │ nodes: + │ regions: + │ actual row count: 1 + │ KV time: 0µs + │ KV contention time: 0µs + │ KV rows read: 1 + │ KV bytes read: 8 B + │ table: b@b_pkey + │ equality: (x) = (x) + │ equality cols are key + │ + └── • scan + nodes: + regions: + actual row count: 1 + KV time: 0µs + KV contention time: 0µs + KV rows read: 1 + KV bytes read: 8 B + estimated max memory allocated: 0 B + estimated row count: 1 (100% of the table; stats collected ago) + table: a@a_y_idx + spans: FULL SCAN diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index fae22c98da61..f8dde910e2c7 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -249,6 +249,7 @@ define IndexJoin { KeyCols []exec.NodeColumnOrdinal TableCols exec.TableColumnOrdinalSet ReqOrdering exec.OutputOrdering + LimitHint int } # LookupJoin performs a lookup join. @@ -284,6 +285,7 @@ define LookupJoin { IsSecondJoinInPairedJoiner bool ReqOrdering exec.OutputOrdering Locking *tree.LockingItem + LimitHint int } # InvertedJoin performs a lookup join into an inverted index. diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 9cb39e28c53e..a65b2dd63f9a 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -597,6 +597,7 @@ func (ef *execFactory) ConstructIndexJoin( keyCols []exec.NodeColumnOrdinal, tableCols exec.TableColumnOrdinalSet, reqOrdering exec.OutputOrdering, + limitHint int, ) (exec.Node, error) { tabDesc := table.(*optTable).desc colCfg := makeScanColumnsConfig(table, tableCols) @@ -618,6 +619,7 @@ func (ef *execFactory) ConstructIndexJoin( cols: cols, resultColumns: colinfo.ResultColumnsFromColumns(tabDesc.GetID(), cols), reqOrdering: ReqOrdering(reqOrdering), + limitHint: limitHint, } n.keyCols = make([]int, len(keyCols)) @@ -644,6 +646,7 @@ func (ef *execFactory) ConstructLookupJoin( isSecondJoinInPairedJoiner bool, reqOrdering exec.OutputOrdering, locking *tree.LockingItem, + limitHint int, ) (exec.Node, error) { if table.IsVirtualTable() { return ef.constructVirtualTableLookupJoin(joinType, input, table, index, eqCols, lookupCols, onCond) @@ -680,6 +683,7 @@ func (ef *execFactory) ConstructLookupJoin( isFirstJoinInPairedJoiner: isFirstJoinInPairedJoiner, isSecondJoinInPairedJoiner: isSecondJoinInPairedJoiner, reqOrdering: ReqOrdering(reqOrdering), + limitHint: limitHint, } n.eqCols = make([]int, len(eqCols)) for i, c := range eqCols { diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 63f3ccacc54b..77d8dace4394 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -217,6 +217,10 @@ type joinReader struct { // used. lookupBatchBytesLimit rowinfra.BytesLimit + // limitHintHelper is used in limiting batches of input rows in the presence + // of hard and soft limits. + limitHintHelper execinfra.LimitHintHelper + // scanStats is collected from the trace after we finish doing work for this // join. scanStats execinfra.ScanStats @@ -316,6 +320,7 @@ func newJoinReader( lockWaitPolicy: row.GetWaitPolicy(spec.LockingWaitPolicy), usesStreamer: useStreamer, lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit), + limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), } if readerType != indexJoinReaderType { jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner} @@ -751,6 +756,10 @@ func (jr *joinReader) readInput() ( return jrStateUnknown, nil, jr.DrainHelper() } jr.scratchInputRows = append(jr.scratchInputRows, jr.rowAlloc.CopyRow(encDatumRow)) + + if l := jr.limitHintHelper.LimitHint(); l != 0 && l == int64(len(jr.scratchInputRows)) { + break + } } if err := jr.performMemoryAccounting(); err != nil { @@ -780,6 +789,11 @@ func (jr *joinReader) readInput() ( jr.updateGroupingStateForNonEmptyBatch() } + if err := jr.limitHintHelper.ReadSomeRows(int64(len(jr.scratchInputRows))); err != nil { + jr.MoveToDraining(err) + return jrStateUnknown, nil, jr.DrainHelper() + } + // Figure out what key spans we need to lookup. spans, err := jr.strategy.processLookupRows(jr.scratchInputRows) if err != nil { From c419ba448e941c9a76e2d35a67327f6e9afc2e78 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 23 Mar 2022 14:14:34 -0700 Subject: [PATCH 2/2] colexecjoin: optimize building output on the left in cross joiner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit updates the way we're building output in the cross joiner from the left input (also used by the merge joiner when building from the buffered group). There, we need to repeat a single tuple `toAppend` times, so we do it in a loop. This commit adds the optimization of using `Bytes.Copy` for the bytes-like types as well as BCE for sliceable types. ``` name old speed new speed delta CrossJoiner/spillForced=false/type=INNER/rows=1-24 1.01MB/s ± 1% 1.00MB/s ± 2% -1.18% (p=0.013 n=10+10) CrossJoiner/spillForced=false/type=INNER/rows=16-24 207MB/s ± 0% 205MB/s ± 2% -0.76% (p=0.023 n=10+10) CrossJoiner/spillForced=false/type=INNER/rows=256-24 6.77GB/s ± 1% 7.78GB/s ± 0% +14.92% (p=0.000 n=10+8) CrossJoiner/spillForced=false/type=INNER/rows=2048-24 8.87GB/s ± 1% 10.33GB/s ± 0% +16.55% (p=0.000 n=9+9) CrossJoiner/spillForced=false/type=INNER/rows=8192-24 8.92GB/s ± 1% 10.39GB/s ± 1% +16.52% (p=0.000 n=10+10) ``` ``` name old speed new speed delta MergeJoiner/rows=32-24 34.7MB/s ± 2% 34.6MB/s ± 3% ~ (p=0.896 n=10+10) MergeJoiner/rows=512-24 94.7MB/s ± 3% 94.6MB/s ± 2% ~ (p=0.619 n=10+9) MergeJoiner/rows=4096-24 235MB/s ± 1% 233MB/s ± 1% -0.94% (p=0.004 n=9+10) MergeJoiner/rows=32768-24 341MB/s ± 3% 340MB/s ± 2% ~ (p=0.315 n=10+10) MergeJoiner/oneSideRepeat-rows=32-24 44.1MB/s ± 2% 44.1MB/s ± 3% ~ (p=0.839 n=10+10) MergeJoiner/oneSideRepeat-rows=512-24 252MB/s ± 2% 262MB/s ± 2% +3.93% (p=0.000 n=10+10) MergeJoiner/oneSideRepeat-rows=4096-24 904MB/s ± 1% 953MB/s ± 2% +5.50% (p=0.000 n=10+10) MergeJoiner/oneSideRepeat-rows=32768-24 1.45GB/s ± 1% 1.53GB/s ± 2% +5.43% (p=0.000 n=10+9) MergeJoiner/bothSidesRepeat-rows=32-24 27.1MB/s ± 1% 27.2MB/s ± 2% ~ (p=0.722 n=10+10) MergeJoiner/bothSidesRepeat-rows=512-24 124MB/s ± 3% 127MB/s ± 2% +2.55% (p=0.001 n=10+10) MergeJoiner/bothSidesRepeat-rows=4096-24 150MB/s ± 1% 152MB/s ± 1% +1.25% (p=0.000 n=10+10) MergeJoiner/bothSidesRepeat-rows=32768-24 84.8MB/s ± 1% 86.4MB/s ± 0% +1.93% (p=0.000 n=10+10) ``` Release note: None --- pkg/sql/colexec/colexecjoin/crossjoiner.eg.go | 174 ++++++++++-------- .../colexec/colexecjoin/crossjoiner_tmpl.go | 37 +++- 2 files changed, 130 insertions(+), 81 deletions(-) diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go b/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go index 7514686a8d94..739b37a44b94 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.eg.go @@ -112,14 +112,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -138,8 +140,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -168,14 +169,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -224,14 +223,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -279,14 +280,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case 32: @@ -331,14 +334,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case -1: @@ -384,14 +389,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -440,14 +447,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -496,14 +505,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -552,14 +563,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -578,8 +591,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -608,14 +620,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -664,14 +674,13 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Set(outStartIdx+i, val) } } + outStartIdx += toAppend } } } @@ -751,14 +760,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -777,8 +788,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -807,14 +817,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -863,14 +871,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -918,14 +928,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case 32: @@ -970,14 +982,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } case -1: @@ -1023,14 +1037,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1079,14 +1095,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1135,14 +1153,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1191,14 +1211,16 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + //gcassert:bce + outCol.Set(i, val) } } + outStartIdx += toAppend } } } @@ -1217,8 +1239,7 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { - val := srcCol.Get(srcStartIdx) - outCol.Set(outStartIdx, val) + outCol.Copy(srcCol, outStartIdx, srcStartIdx) } outStartIdx++ bs.curSrcStartIdx++ @@ -1247,14 +1268,12 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { - val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) } } + outStartIdx += toAppend } } } @@ -1303,14 +1322,13 @@ func (b *crossJoinerBase) buildFromLeftInput(ctx context.Context, destStartIdx i } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { val := srcCol.Get(srcStartIdx) for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + outCol.Set(outStartIdx+i, val) } } + outStartIdx += toAppend } } } diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go b/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go index e01a6da7316e..0f2dbf86c02c 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner_tmpl.go @@ -87,8 +87,12 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in if srcNulls.NullAt(srcStartIdx) { outNulls.SetNull(outStartIdx) } else { + // {{if .IsBytesLike}} + outCol.Copy(srcCol, outStartIdx, srcStartIdx) + // {{else}} val := srcCol.Get(srcStartIdx) outCol.Set(outStartIdx, val) + // {{end}} } outStartIdx++ bs.curSrcStartIdx++ @@ -102,6 +106,7 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in } else { srcStartIdx = bs.curSrcStartIdx } + // {{/* toAppend will always be positive. */}} toAppend := leftNumRepeats - bs.numRepeatsIdx if outStartIdx+toAppend > outputCapacity { // We don't have enough space to repeat the current @@ -120,14 +125,40 @@ func buildFromLeftBatch(b *crossJoinerBase, currentBatch coldata.Batch, sel []in } if srcNulls.NullAt(srcStartIdx) { outNulls.SetNullRange(outStartIdx, outStartIdx+toAppend) - outStartIdx += toAppend } else { + // {{if not .IsBytesLike}} + // {{if .Sliceable}} + outCol := outCol[outStartIdx:] + _ = outCol[toAppend-1] + // {{end}} val := srcCol.Get(srcStartIdx) + // {{end}} for i := 0; i < toAppend; i++ { - outCol.Set(outStartIdx, val) - outStartIdx++ + // {{if .IsBytesLike}} + outCol.Copy(srcCol, outStartIdx+i, srcStartIdx) + // {{else}} + // {{if .Sliceable}} + // {{/* + // For the sliceable types, we sliced outCol + // to start at outStartIdx, so we use index + // i directly. + // */}} + //gcassert:bce + outCol.Set(i, val) + // {{else}} + // {{/* + // For the non-sliceable types, outCol + // vector is the original one (i.e. without + // an adjustment), so we need to add + // outStartIdx to set the element at the + // correct index. + // */}} + outCol.Set(outStartIdx+i, val) + // {{end}} + // {{end}} } } + outStartIdx += toAppend } } // {{end}}