diff --git a/Makefile b/Makefile index e7953246dea4..7bdcfabcbdea 100644 --- a/Makefile +++ b/Makefile @@ -888,6 +888,8 @@ EXECGEN_TARGETS = \ pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go \ pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go \ pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go \ + pkg/sql/colexec/colexecjoin/span_assembler.eg.go \ + pkg/sql/colexec/colexecjoin/span_encoder.eg.go \ pkg/sql/colexec/colexecproj/default_cmp_proj_ops.eg.go \ pkg/sql/colexec/colexecproj/proj_const_left_ops.eg.go \ pkg/sql/colexec/colexecproj/proj_const_right_ops.eg.go \ diff --git a/pkg/col/coldata/bytes.go b/pkg/col/coldata/bytes.go index bb1cfe73ef20..df8c4c0067aa 100644 --- a/pkg/col/coldata/bytes.go +++ b/pkg/col/coldata/bytes.go @@ -410,6 +410,15 @@ func (b *Bytes) Reset() { b.maxSetLength = 0 } +// ResetForAppend is similar to Reset, but is also resets the offsets slice so +// that future calls to AppendSlice or AppendVal will append starting from index +// zero. TODO(drewk): once Set is removed, this can just be Reset. +func (b *Bytes) ResetForAppend() { + b.Reset() + // The first offset indicates where the first element will start. + b.offsets = b.offsets[:1] +} + // String is used for debugging purposes. func (b *Bytes) String() string { var builder strings.Builder diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 4ee72289f997..1a345bc7eed4 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -706,7 +706,7 @@ func init() { // PrettyPrintRange pretty prints a compact representation of a key range. The // output is of the form: // commonPrefix{remainingStart-remainingEnd} -// If the end key is empty, the outut is of the form: +// If the end key is empty, the output is of the form: // start // It prints at most maxChars, truncating components as needed. See // TestPrettyPrintRange for some examples. diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index c864bb5a986c..bba4785b8945 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -177,6 +177,20 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error { } return nil + case spec.Core.JoinReader != nil: + if spec.Core.JoinReader.LookupColumns != nil || !spec.Core.JoinReader.LookupExpr.Empty() { + return errors.Newf("lookup join reader is unsupported in vectorized") + } + for i := range spec.Core.JoinReader.Table.Indexes { + if spec.Core.JoinReader.Table.Indexes[i].IsInterleaved() { + // Interleaved indexes are going to be removed anyway, so there is no + // point in handling the extra complexity. Just let the row engine + // handle this. + return errors.Newf("vectorized join reader is unsupported for interleaved indexes") + } + } + return nil + case spec.Core.Filterer != nil: return nil @@ -781,6 +795,34 @@ func NewColOperator( result.ColumnTypes = scanOp.ResultTypes result.ToClose = append(result.ToClose, scanOp) + case core.JoinReader != nil: + if err := checkNumIn(inputs, 1); err != nil { + return r, err + } + if core.JoinReader.LookupColumns != nil || !core.JoinReader.LookupExpr.Empty() { + return r, errors.Newf("lookup join reader is unsupported in vectorized") + } + + semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn) + inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes)) + copy(inputTypes, spec.Input[0].ColumnTypes) + indexJoinOp, err := colfetcher.NewColIndexJoin( + ctx, streamingAllocator, flowCtx, evalCtx, semaCtx, + inputs[0].Root, core.JoinReader, post, inputTypes) + if err != nil { + return r, err + } + result.Root = indexJoinOp + if util.CrdbTestBuild { + result.Root = colexec.NewInvariantsChecker(result.Root) + } + result.KVReader = indexJoinOp + result.MetadataSources = append(result.MetadataSources, result.Root.(colexecop.MetadataSource)) + result.Releasables = append(result.Releasables, indexJoinOp) + result.Root = colexecutils.NewCancelChecker(result.Root) + result.ColumnTypes = indexJoinOp.ResultTypes + result.ToClose = append(result.ToClose, indexJoinOp) + case core.Filterer != nil: if err := checkNumIn(inputs, 1); err != nil { return r, err diff --git a/pkg/sql/colexec/colexecjoin/BUILD.bazel b/pkg/sql/colexec/colexecjoin/BUILD.bazel index 59f4c062885c..b7e0b1aefadb 100644 --- a/pkg/sql/colexec/colexecjoin/BUILD.bazel +++ b/pkg/sql/colexec/colexecjoin/BUILD.bazel @@ -17,6 +17,9 @@ go_library( "//pkg/col/coldata", "//pkg/col/coldataext", # keep "//pkg/col/typeconv", + "//pkg/keys", # keep + "//pkg/roachpb", # keep + "//pkg/sql/catalog", # keep "//pkg/sql/catalog/descpb", "//pkg/sql/colcontainer", "//pkg/sql/colexec/colexecbase", @@ -27,10 +30,12 @@ go_library( "//pkg/sql/colmem", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", + "//pkg/sql/rowenc", # keep "//pkg/sql/sem/tree", # keep "//pkg/sql/types", "//pkg/util", "//pkg/util/duration", # keep + "//pkg/util/encoding", # keep "//pkg/util/json", # keep "//pkg/util/mon", "@com_github_cockroachdb_apd_v2//:apd", # keep @@ -45,25 +50,35 @@ go_test( "dep_test.go", "main_test.go", "mergejoiner_test.go", + "span_assembler_test.go", ], embed = [":colexecjoin"], deps = [ "//pkg/col/coldata", "//pkg/col/coldataext", "//pkg/col/coldatatestutils", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security", "//pkg/settings/cluster", + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/colconv", "//pkg/sql/colexec/colexectestutils", "//pkg/sql/colexecerror", "//pkg/sql/colexecop", "//pkg/sql/colmem", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", + "//pkg/sql/rowenc", "//pkg/sql/sem/tree", + "//pkg/sql/span", "//pkg/sql/types", "//pkg/testutils/buildutil", "//pkg/testutils/colcontainerutils", "//pkg/testutils/skip", + "//pkg/util", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", @@ -87,6 +102,8 @@ targets = [ ("mergejoiner_rightanti.eg.go", "mergejoiner_tmpl.go"), ("mergejoiner_rightouter.eg.go", "mergejoiner_tmpl.go"), ("mergejoiner_rightsemi.eg.go", "mergejoiner_tmpl.go"), + ("span_assembler.eg.go", "span_assembler_tmpl.go"), + ("span_encoder.eg.go", "span_encoder_tmpl.go"), ] # Define a file group for all the .eg.go targets. diff --git a/pkg/sql/colexec/colexecjoin/span_assembler.eg.go b/pkg/sql/colexec/colexecjoin/span_assembler.eg.go new file mode 100644 index 000000000000..62a788ce6e71 --- /dev/null +++ b/pkg/sql/colexec/colexecjoin/span_assembler.eg.go @@ -0,0 +1,385 @@ +// Code generated by execgen; DO NOT EDIT. +// Copyright 2021 The Cockroach Authors. +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexecjoin + +import ( + "sync" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/encoding" +) + +// NewColSpanAssembler returns a ColSpanAssembler operator that is able to +// generate lookup spans from input batches. +func NewColSpanAssembler( + codec keys.SQLCodec, + allocator *colmem.Allocator, + table catalog.TableDescriptor, + index catalog.Index, + inputTypes []*types.T, + neededCols util.FastIntSet, + sizeLimit int, +) ColSpanAssembler { + colFamStartKeys, colFamEndKeys := getColFamilyEncodings(neededCols, table, index) + + // Add span encoders to encode each primary key column as bytes. The + // ColSpanAssembler will later append these together to form valid spans. + var spanEncoders []spanEncoder + for i := 0; i < index.NumKeyColumns(); i++ { + asc := index.GetKeyColumnDirection(i) == descpb.IndexDescriptor_ASC + spanEncoders = append(spanEncoders, newSpanEncoder(allocator, inputTypes[i], asc, i)) + } + + b := spanAssemblerPool.Get().(*spanAssemblerBase) + + base := spanAssemblerBase{ + allocator: allocator, + spans: b.spans[:0], + keyPrefix: rowenc.MakeIndexKeyPrefix(codec, table, index.GetID()), + spanEncoders: spanEncoders, + spanCols: make([]*coldata.Bytes, len(spanEncoders)), + colFamStartKeys: colFamStartKeys, + colFamEndKeys: colFamEndKeys, + sizeLimit: sizeLimit, + } + + if len(colFamStartKeys) == 0 { + return &spanAssemblerNoColFamily{spanAssemblerBase: base} + } + return &spanAssemblerWithColFamily{spanAssemblerBase: base} +} + +var spanAssemblerPool = sync.Pool{ + New: func() interface{} { + return &spanAssemblerBase{} + }, +} + +// ColSpanAssembler is a utility operator that generates a series of spans from +// input batches which can be used to perform an index join or lookup join. +type ColSpanAssembler interface { + execinfra.Releasable + + // ConsumeBatch generates lookup spans from input batches and stores them to + // later be returned by GetSpans. Once the size limit has been reached, + // ConsumeBatch returns true. Note that it is up to the caller to choose + // whether to respect the size limit. + ConsumeBatch(coldata.Batch) (reachedLimit bool) + + // GetSpans returns the set of spans that have been generated so far. The + // returned Spans object is still owned by the SpanAssembler, so it cannot be + // modified. GetSpans will return nil if it is called before ConsumeBatch. + // Note that subsequent calls to GetSpans will invalidate the spans returned + // by previous calls, since the spans slice is reused (although the underlying + // key bytes are still valid). A caller that wishes to hold on to spans over + // the course of multiple calls should perform a shallow copy of the Spans. + GetSpans() roachpb.Spans + + // Close closes the ColSpanAssembler operator. + Close() +} + +// spanAssemblerBase extracts common fields between the SpanAssembler operators. +type spanAssemblerBase struct { + allocator *colmem.Allocator + + // spans is the list of spans that have been assembled so far. spans is owned + // and reset upon each call to GetSpans by the SpanAssembler operator. + spans roachpb.Spans + + // keyPrefix contains the portion of the encoding that is shared between all + // spans - namely, table, etc. + keyPrefix roachpb.Key + + // scratchKey is a scratch space used to append the key prefix and the key + // column encodings. It is reused for each span. + scratchKey roachpb.Key + + // spanEncoders is an ordered list of utility operators that encode each key + // column in vectorized fashion. + spanEncoders []spanEncoder + + // spanCols is used to iterate through the input columns that contain the + // key encodings during span construction. + spanCols []*coldata.Bytes + + // colFamStartKeys and colFamEndKeys is the list of start and end key suffixes + // for the column families that should be scanned. The spans will be split to + // scan over each family individually. Note that it is not necessarily + // possible to break a span into family scans. + colFamStartKeys, colFamEndKeys []roachpb.Key + + // maxSpansLength tracks the largest length the spans field reaches, so that + // all the Span objects can be deeply reset upon a call to close. + maxSpansLength int + + // sizeLimit is a limit on the number of bytes that can be allocated for the + // span keys before the spans are returned. It is a field instead of a + // constant so that it can be modified in tests. + sizeLimit int + + // shouldReset tracks whether ConsumeBatch needs to reset the spans slice + // before appending to it. + shouldReset bool +} + +type spanAssemblerNoColFamily struct { + spanAssemblerBase +} + +var _ ColSpanAssembler = &spanAssemblerNoColFamily{} + +// ConsumeBatch implements the ColSpanAssembler interface. +func (op *spanAssemblerNoColFamily) ConsumeBatch(batch coldata.Batch) (reachedLimit bool) { + n := batch.Length() + if n == 0 { + return false /* reachedLimit */ + } + + if op.shouldReset { + if len(op.spans) > op.maxSpansLength { + // Ensure that all initialized spans are deeply reset upon a call to Close. + op.maxSpansLength = len(op.spans) + } + op.spans = op.spans[:0] + } + op.shouldReset = false + + for i := range op.spanEncoders { + op.spanCols[i] = op.spanEncoders[i].next(batch) + } + + // TODO(drewk): currently we have to allocate each key individually because + // makeKVBatchFetcherWithSendFunc doesn't copy the underlying bytes of the + // spans. Consider making the batch fetcher perform a deep copy so we can + // store all keys in a flat byte slice, and reset and reuse it for each batch. + var spanBytes int + oldSpansCap := cap(op.spans) + for i := 0; i < n; i++ { + // Every key has a prefix encoding the table, index, etc. + op.scratchKey = append(op.scratchKey[:0], op.keyPrefix...) + for j := range op.spanCols { + // The encoding for each primary key column has previously been + // calculated and stored in an input column. + op.scratchKey = append(op.scratchKey, op.spanCols[j].Get(i)...) + } + { + // The spans cannot be split into column family spans, so there will be + // exactly one span for each input row. + var span roachpb.Span + span.Key = make(roachpb.Key, 0, len(op.scratchKey)) + span.Key = append(span.Key, op.scratchKey...) + spanBytes += len(span.Key) + span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+1) + span.EndKey = append(span.EndKey, op.scratchKey...) + // TODO(drewk): change this to use PrefixEnd() when interleaved indexes are + // permanently removed. Keep it this way for now to allow testing + // against the row engine, even though the vectorized index joiner doesn't + // allow interleaved indexes. + span.EndKey = encoding.EncodeInterleavedSentinel(span.EndKey) + spanBytes += len(span.EndKey) + op.spans = append(op.spans, span) + } + } + + spansSizeInc := (cap(op.spans) - oldSpansCap) * int(spanSize) + op.allocator.AdjustMemoryUsage(int64(spanBytes + spansSizeInc)) + return spanBytes >= op.sizeLimit +} + +type spanAssemblerWithColFamily struct { + spanAssemblerBase +} + +var _ ColSpanAssembler = &spanAssemblerWithColFamily{} + +// ConsumeBatch implements the ColSpanAssembler interface. +func (op *spanAssemblerWithColFamily) ConsumeBatch(batch coldata.Batch) (reachedLimit bool) { + n := batch.Length() + if n == 0 { + return false /* reachedLimit */ + } + + if op.shouldReset { + if len(op.spans) > op.maxSpansLength { + // Ensure that all initialized spans are deeply reset upon a call to Close. + op.maxSpansLength = len(op.spans) + } + op.spans = op.spans[:0] + } + op.shouldReset = false + + for i := range op.spanEncoders { + op.spanCols[i] = op.spanEncoders[i].next(batch) + } + + // TODO(drewk): currently we have to allocate each key individually because + // makeKVBatchFetcherWithSendFunc doesn't copy the underlying bytes of the + // spans. Consider making the batch fetcher perform a deep copy so we can + // store all keys in a flat byte slice, and reset and reuse it for each batch. + var spanBytes int + oldSpansCap := cap(op.spans) + for i := 0; i < n; i++ { + // Every key has a prefix encoding the table, index, etc. + op.scratchKey = append(op.scratchKey[:0], op.keyPrefix...) + for j := range op.spanCols { + // The encoding for each primary key column has previously been + // calculated and stored in an input column. + op.scratchKey = append(op.scratchKey, op.spanCols[j].Get(i)...) + } + { + // The span for each row can be split into a series of column family spans, + // which have the column family ID as a suffix. Individual column family + // spans can be served as Get requests, which are more efficient than Scan + // requests. + for j := range op.colFamStartKeys { + var span roachpb.Span + span.Key = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamStartKeys[j])) + span.Key = append(span.Key, op.scratchKey...) + span.Key = append(span.Key, op.colFamStartKeys[j]...) + spanBytes += len(span.Key) + // The end key may be nil, in which case the span is a point lookup. + if len(op.colFamEndKeys[j]) > 0 { + span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamEndKeys[j])) + span.EndKey = append(span.EndKey, op.scratchKey...) + span.EndKey = append(span.EndKey, op.colFamEndKeys[j]...) + spanBytes += len(span.EndKey) + } + op.spans = append(op.spans, span) + } + } + } + + spansSizeInc := (cap(op.spans) - oldSpansCap) * int(spanSize) + op.allocator.AdjustMemoryUsage(int64(spanBytes + spansSizeInc)) + return spanBytes >= op.sizeLimit +} + +const spanSize = unsafe.Sizeof(roachpb.Span{}) + +// GetSpans implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) GetSpans() roachpb.Spans { + if b.shouldReset { + // No spans have been generated since the last call to GetSpans. + return nil + } + b.shouldReset = true + return b.spans +} + +// Close implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) Close() { + for i := range b.spanEncoders { + b.spanEncoders[i].close() + } +} + +// Release implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) Release() { + for i := range b.spanCols { + // Release references to input columns. + b.spanCols[i] = nil + } + b.spans = b.spans[:b.maxSpansLength] + for i := range b.spans { + // Deeply reset all spans that were initialized during execution. + b.spans[i] = roachpb.Span{} + } + + *b = spanAssemblerBase{ + spans: b.spans[:0], + } + spanAssemblerPool.Put(b) +} + +// execgen:inline +const _ = "template_constructSpans" + +// getColFamilyEncodings returns two lists of keys of the same length. Each pair +// of keys at the same index corresponds to the suffixes of the start and end +// keys of a span over a specific column family (or adjacent column families). +// If the returned lists are empty, the spans cannot be split into separate +// family spans. +func getColFamilyEncodings( + neededCols util.FastIntSet, table catalog.TableDescriptor, index catalog.Index, +) (startKeys, endKeys []roachpb.Key) { + familyIDs := rowenc.NeededColumnFamilyIDs(neededCols, table, index) + + if !canSplitSpans(len(familyIDs), table, index) { + return nil, nil + } + + for i, familyID := range familyIDs { + var key roachpb.Key + key = keys.MakeFamilyKey(key, uint32(familyID)) + if i > 0 && familyID-1 == familyIDs[i-1] && endKeys != nil { + // This column family is adjacent to the previous one. We can merge + // the two spans into one. + endKeys[len(endKeys)-1] = key.PrefixEnd() + } else { + startKeys = append(startKeys, key) + endKeys = append(endKeys, nil) + } + } + return startKeys, endKeys +} + +// canSplitSpans returns true if the spans that will be generated by the +// SpanAssembler operator can be split into spans over individual column +// families. For index joins, either all spans can be split or none can because +// the lookup columns are never nullable (null values prevent the index key from +// being fully knowable). +func canSplitSpans(numNeededFamilies int, table catalog.TableDescriptor, index catalog.Index) bool { + // We can only split a span into separate family specific point lookups if: + // * The table is not a special system table. (System tables claim to have + // column families, but actually do not, since they're written to with + // raw KV puts in a "legacy" way.) + if table.GetID() > 0 && table.GetID() < keys.MaxReservedDescID { + return false + } + + // * The index either has just 1 family (so we'll make a GetRequest) or we + // need fewer than every column family in the table (otherwise we'd just + // make a big ScanRequest). + numFamilies := len(table.GetFamilies()) + if numFamilies > 1 && numNeededFamilies == numFamilies { + return false + } + + // Other requirements that are always satisfied by index joins, and therefore + // do not need to be checked: + // * The index is unique. + // * The index is fully constrained. + // * If we're looking at a secondary index... + // * The index constraint must not contain null, since that would cause the + // index key to not be completely knowable. + // * The index cannot be inverted. + // * The index must store some columns. + // * The index is a new enough version. + // We've passed all the conditions, and should be able to safely split this + // span into multiple column-family-specific spans. + return true +} + +// execgen:inline +const _ = "inlined_constructSpans_true" + +// execgen:inline +const _ = "inlined_constructSpans_false" diff --git a/pkg/sql/colexec/colexecjoin/span_assembler_test.go b/pkg/sql/colexec/colexecjoin/span_assembler_test.go new file mode 100644 index 000000000000..5e5e08de6f62 --- /dev/null +++ b/pkg/sql/colexec/colexecjoin/span_assembler_test.go @@ -0,0 +1,241 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexecjoin + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/col/coldataext" + "github.com/cockroachdb/cockroach/pkg/col/coldatatestutils" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/colconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/span" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestSpanAssembler(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.MakeTestingEvalContext(st) + testMemMonitor = execinfra.NewTestMemMonitor(ctx, st) + defer testMemMonitor.Stop(ctx) + nTuples := 3 * coldata.BatchSize() + memAcc := testMemMonitor.MakeBoundAccount() + testMemAcc = &memAcc + testColumnFactory = coldataext.NewExtendedColumnFactory(&evalCtx) + testAllocator = colmem.NewAllocator(ctx, testMemAcc, testColumnFactory) + defer testMemAcc.Close(ctx) + rng, _ := randutil.NewPseudoRand() + typs := []*types.T{types.Int, types.Bytes, types.Decimal} + + for _, useColFamilies := range []bool{true, false} { + name := "WithColFamilies" + if !useColFamilies { + name = "NoColFamilies" + } + t.Run(name, func(t *testing.T) { + for _, sizeLimit := range []int{ + 1, // 1 byte + 1 << 10, // 1 KB + 1 << 20, // 1 MB + } { + t.Run(fmt.Sprintf("sizeLimit=%d", sizeLimit), func(t *testing.T) { + for _, useSel := range []bool{true, false} { + t.Run(fmt.Sprintf("sel=%v", useSel), func(t *testing.T) { + probOfOmittingRow := 0.0 + if useSel { + probOfOmittingRow = 0.3 + } + sel := coldatatestutils.RandomSel(rng, coldata.BatchSize(), probOfOmittingRow) + testTable := makeTable(useColFamilies) + neededColumns := util.MakeFastIntSet(1, 2, 3, 4) + + cols := make([]coldata.Vec, len(typs)) + for i, typ := range typs { + cols[i] = testAllocator.NewMemColumn(typ, nTuples) + } + for i := range typs { + coldatatestutils.RandomVec(coldatatestutils.RandomVecArgs{ + Rand: rng, + Vec: cols[i], + N: nTuples, + NullProbability: 0, // Primary key columns are non-null. + }) + } + source := colexectestutils.NewChunkingBatchSource(testAllocator, typs, cols, nTuples) + source.Init(ctx) + oracleSource := colexectestutils.NewChunkingBatchSource(testAllocator, typs, cols, nTuples) + oracleSource.Init(ctx) + converter := colconv.NewAllVecToDatumConverter(len(typs)) + + builder := span.MakeBuilder(&evalCtx, keys.TODOSQLCodec, testTable, testTable.GetPrimaryIndex()) + builder.SetNeededColumns(neededColumns) + + colBuilder := NewColSpanAssembler(keys.TODOSQLCodec, testAllocator, testTable, + testTable.GetPrimaryIndex(), typs, neededColumns, sizeLimit) + defer func() { + colBuilder.Close() + colBuilder.Release() + }() + + var testSpans roachpb.Spans + for batch := source.Next(); ; batch = source.Next() { + if batch.Length() == 0 { + // Reached the end of the input. + testSpans = append(testSpans, colBuilder.GetSpans()...) + break + } + if useSel { + batch.SetSelection(true) + copy(batch.Selection(), sel) + batch.SetLength(len(sel)) + } + if colBuilder.ConsumeBatch(batch) { + // Reached the memory limit. + testSpans = append(testSpans, colBuilder.GetSpans()...) + } + } + + var oracleSpans roachpb.Spans + for batch := oracleSource.Next(); batch.Length() > 0; batch = oracleSource.Next() { + batch.SetSelection(true) + copy(batch.Selection(), sel) + batch.SetLength(len(sel)) + converter.ConvertBatchAndDeselect(batch) + rows := make(rowenc.EncDatumRows, len(sel)) + for i := range sel { + // Note that sel contains all rows if useSel=false. + row := make(rowenc.EncDatumRow, len(typs)) + for j := range typs { + datum := converter.GetDatumColumn(j)[i] + row[j] = rowenc.DatumToEncDatum(typs[j], datum) + } + rows[i] = row + } + oracleSpans = append(oracleSpans, spanGeneratorOracle(t, builder, rows, len(typs))...) + } + + if len(oracleSpans) != len(testSpans) { + t.Fatalf("Expected %d spans, got %d.", len(oracleSpans), len(testSpans)) + } + for i := range oracleSpans { + oracleSpan := oracleSpans[i] + testSpan := testSpans[i] + if !reflect.DeepEqual(oracleSpan, testSpan) { + t.Fatalf("Span at index %d incorrect.\n\nExpected:\n%v\n\nFound:\n%v\n", + i, oracleSpan, testSpan) + } + } + }) + } + }) + } + }) + } +} + +// spanGeneratorOracle extracts the logic from joinreader_span_generator.go that +// pertains to index joins. +func spanGeneratorOracle( + t *testing.T, spanBuilder *span.Builder, rows []rowenc.EncDatumRow, lookupCols int, +) roachpb.Spans { + var spans roachpb.Spans + for _, inputRow := range rows { + generatedSpan, containsNull, err := spanBuilder.SpanFromEncDatums(inputRow, lookupCols) + if err != nil { + t.Fatal(err) + } + spans = spanBuilder.MaybeSplitSpanIntoSeparateFamilies( + spans, generatedSpan, lookupCols, containsNull) + } + return spans +} + +func makeTable(useColFamilies bool) catalog.TableDescriptor { + tableID := keys.MinNonPredefinedUserDescID + if !useColFamilies { + // We can prevent the span builder from splitting spans into separate column + // families by using a system table ID, since system tables do not have + // column families. + tableID = keys.SystemDatabaseID + } + + var testTableDesc = descpb.TableDescriptor{ + Name: "abcd", + ID: descpb.ID(tableID), + Privileges: descpb.NewDefaultPrivilegeDescriptor(security.AdminRoleName()), + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "a", ID: 1, Type: types.Int}, + {Name: "b", ID: 2, Type: types.Bytes}, + {Name: "c", ID: 3, Type: types.Decimal}, + {Name: "d", ID: 4, Type: types.Int}, + }, + NextColumnID: 5, + Families: []descpb.ColumnFamilyDescriptor{ + {Name: "primary", ID: 0, ColumnNames: []string{"a", "b", "d"}, ColumnIDs: []descpb.ColumnID{1, 2, 4}}, + {Name: "secondary", ID: 1, ColumnNames: []string{"c"}, ColumnIDs: []descpb.ColumnID{3}}, + }, + NextFamilyID: 2, + PrimaryIndex: descpb.IndexDescriptor{ + Name: "primary", + ID: 1, + Unique: true, + KeyColumnNames: []string{"a", "b", "c"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{1, 2, 3}, + }, + Indexes: []descpb.IndexDescriptor{ + { // Secondary index omits column 'd'. + Name: "secondary", + ID: 2, + Unique: true, + KeyColumnNames: []string{"c", "a", "b"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{3, 1, 2}, + KeySuffixColumnIDs: []descpb.ColumnID{1, 2}, + }, + }, + NextIndexID: 3, + FormatVersion: descpb.FamilyFormatVersion, + NextMutationID: 1, + } + + ctx := context.Background() + b := tabledesc.NewBuilder(&testTableDesc) + err := b.RunPostDeserializationChanges(ctx, nil /* DescGetter */) + if err != nil { + log.Fatalf(ctx, "Error when building descriptor of system table %q: %s", testTableDesc.Name, err) + } + return b.BuildImmutableTable() +} diff --git a/pkg/sql/colexec/colexecjoin/span_assembler_tmpl.go b/pkg/sql/colexec/colexecjoin/span_assembler_tmpl.go new file mode 100644 index 000000000000..2ce6496d8ce4 --- /dev/null +++ b/pkg/sql/colexec/colexecjoin/span_assembler_tmpl.go @@ -0,0 +1,355 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// {{/* +// +build execgen_template +// +// This file is the execgen template for span_encoder.eg.go. It's formatted in a +// special way, so it's both valid Go and a valid text/template input. This +// permits editing this file with editor support. +// +// */}} + +package colexecjoin + +import ( + "sync" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/encoding" +) + +// NewColSpanAssembler returns a ColSpanAssembler operator that is able to +// generate lookup spans from input batches. +func NewColSpanAssembler( + codec keys.SQLCodec, + allocator *colmem.Allocator, + table catalog.TableDescriptor, + index catalog.Index, + inputTypes []*types.T, + neededCols util.FastIntSet, + sizeLimit int, +) ColSpanAssembler { + colFamStartKeys, colFamEndKeys := getColFamilyEncodings(neededCols, table, index) + + // Add span encoders to encode each primary key column as bytes. The + // ColSpanAssembler will later append these together to form valid spans. + var spanEncoders []spanEncoder + for i := 0; i < index.NumKeyColumns(); i++ { + asc := index.GetKeyColumnDirection(i) == descpb.IndexDescriptor_ASC + spanEncoders = append(spanEncoders, newSpanEncoder(allocator, inputTypes[i], asc, i)) + } + + b := spanAssemblerPool.Get().(*spanAssemblerBase) + + base := spanAssemblerBase{ + allocator: allocator, + spans: b.spans[:0], + keyPrefix: rowenc.MakeIndexKeyPrefix(codec, table, index.GetID()), + spanEncoders: spanEncoders, + spanCols: make([]*coldata.Bytes, len(spanEncoders)), + colFamStartKeys: colFamStartKeys, + colFamEndKeys: colFamEndKeys, + sizeLimit: sizeLimit, + } + + if len(colFamStartKeys) == 0 { + return &spanAssemblerNoColFamily{spanAssemblerBase: base} + } + return &spanAssemblerWithColFamily{spanAssemblerBase: base} +} + +var spanAssemblerPool = sync.Pool{ + New: func() interface{} { + return &spanAssemblerBase{} + }, +} + +// ColSpanAssembler is a utility operator that generates a series of spans from +// input batches which can be used to perform an index join or lookup join. +type ColSpanAssembler interface { + execinfra.Releasable + + // ConsumeBatch generates lookup spans from input batches and stores them to + // later be returned by GetSpans. Once the size limit has been reached, + // ConsumeBatch returns true. Note that it is up to the caller to choose + // whether to respect the size limit. + ConsumeBatch(coldata.Batch) (reachedLimit bool) + + // GetSpans returns the set of spans that have been generated so far. The + // returned Spans object is still owned by the SpanAssembler, so it cannot be + // modified. GetSpans will return nil if it is called before ConsumeBatch. + // + // Note that subsequent calls to GetSpans will invalidate the spans returned + // by previous calls, since the spans slice is reused (although the underlying + // key bytes are still valid). A caller that wishes to hold on to spans over + // the course of multiple calls should perform a shallow copy of the Spans. + GetSpans() roachpb.Spans + + // Close closes the ColSpanAssembler operator. + Close() +} + +// spanAssemblerBase extracts common fields between the SpanAssembler operators. +type spanAssemblerBase struct { + allocator *colmem.Allocator + + // spans is the list of spans that have been assembled so far. spans is owned + // and reset upon each call to GetSpans by the SpanAssembler operator. + spans roachpb.Spans + + // keyPrefix contains the portion of the encoding that is shared between all + // spans - namely, table, etc. + keyPrefix roachpb.Key + + // scratchKey is a scratch space used to append the key prefix and the key + // column encodings. It is reused for each span. + scratchKey roachpb.Key + + // spanEncoders is an ordered list of utility operators that encode each key + // column in vectorized fashion. + spanEncoders []spanEncoder + + // spanCols is used to iterate through the input columns that contain the + // key encodings during span construction. + spanCols []*coldata.Bytes + + // colFamStartKeys and colFamEndKeys is the list of start and end key suffixes + // for the column families that should be scanned. The spans will be split to + // scan over each family individually. Note that it is not necessarily + // possible to break a span into family scans. + colFamStartKeys, colFamEndKeys []roachpb.Key + + // maxSpansLength tracks the largest length the spans field reaches, so that + // all the Span objects can be deeply reset upon a call to close. + maxSpansLength int + + // sizeLimit is a limit on the number of bytes that can be allocated for the + // span keys before the spans are returned. It is a field instead of a + // constant so that it can be modified in tests. + sizeLimit int + + // shouldReset tracks whether ConsumeBatch needs to reset the spans slice + // before appending to it. + shouldReset bool +} + +// {{range .}} + +type _OP_STRING struct { + spanAssemblerBase +} + +var _ ColSpanAssembler = &_OP_STRING{} + +// ConsumeBatch implements the ColSpanAssembler interface. +func (op *_OP_STRING) ConsumeBatch(batch coldata.Batch) (reachedLimit bool) { + n := batch.Length() + if n == 0 { + return false /* reachedLimit */ + } + + if op.shouldReset { + if len(op.spans) > op.maxSpansLength { + // Ensure that all initialized spans are deeply reset upon a call to Close. + op.maxSpansLength = len(op.spans) + } + op.spans = op.spans[:0] + } + op.shouldReset = false + + for i := range op.spanEncoders { + op.spanCols[i] = op.spanEncoders[i].next(batch) + } + + // TODO(drewk): currently we have to allocate each key individually because + // makeKVBatchFetcherWithSendFunc doesn't copy the underlying bytes of the + // spans. Consider making the batch fetcher perform a deep copy so we can + // store all keys in a flat byte slice, and reset and reuse it for each batch. + var spanBytes int + oldSpansCap := cap(op.spans) + for i := 0; i < n; i++ { + // Every key has a prefix encoding the table, index, etc. + op.scratchKey = append(op.scratchKey[:0], op.keyPrefix...) + for j := range op.spanCols { + // The encoding for each primary key column has previously been + // calculated and stored in an input column. + op.scratchKey = append(op.scratchKey, op.spanCols[j].Get(i)...) + } + // {{if .WithColFamilies}} + constructSpans(true) + // {{else}} + constructSpans(false) + // {{end}} + } + + spansSizeInc := (cap(op.spans) - oldSpansCap) * int(spanSize) + op.allocator.AdjustMemoryUsage(int64(spanBytes + spansSizeInc)) + return spanBytes >= op.sizeLimit +} + +// {{end}} + +const spanSize = unsafe.Sizeof(roachpb.Span{}) + +// GetSpans implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) GetSpans() roachpb.Spans { + if b.shouldReset { + // No spans have been generated since the last call to GetSpans. + return nil + } + b.shouldReset = true + return b.spans +} + +// Close implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) Close() { + for i := range b.spanEncoders { + b.spanEncoders[i].close() + } +} + +// Release implements the ColSpanAssembler interface. +func (b *spanAssemblerBase) Release() { + for i := range b.spanCols { + // Release references to input columns. + b.spanCols[i] = nil + } + b.spans = b.spans[:b.maxSpansLength] + for i := range b.spans { + // Deeply reset all spans that were initialized during execution. + b.spans[i] = roachpb.Span{} + } + + *b = spanAssemblerBase{ + spans: b.spans[:0], + } + spanAssemblerPool.Put(b) +} + +// execgen:inline +// execgen:template +func constructSpans(hasFamilies bool) { + if hasFamilies { + // The span for each row can be split into a series of column family spans, + // which have the column family ID as a suffix. Individual column family + // spans can be served as Get requests, which are more efficient than Scan + // requests. + for j := range op.colFamStartKeys { + var span roachpb.Span + span.Key = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamStartKeys[j])) + span.Key = append(span.Key, op.scratchKey...) + span.Key = append(span.Key, op.colFamStartKeys[j]...) + spanBytes += len(span.Key) + // The end key may be nil, in which case the span is a point lookup. + if len(op.colFamEndKeys[j]) > 0 { + span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamEndKeys[j])) + span.EndKey = append(span.EndKey, op.scratchKey...) + span.EndKey = append(span.EndKey, op.colFamEndKeys[j]...) + spanBytes += len(span.EndKey) + } + op.spans = append(op.spans, span) + } + } else { + // The spans cannot be split into column family spans, so there will be + // exactly one span for each input row. + var span roachpb.Span + span.Key = make(roachpb.Key, 0, len(op.scratchKey)) + span.Key = append(span.Key, op.scratchKey...) + spanBytes += len(span.Key) + span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+1) + span.EndKey = append(span.EndKey, op.scratchKey...) + // TODO(drewk): change this to use PrefixEnd() when interleaved indexes are + // permanently removed. Keep it this way for now to allow testing + // against the row engine, even though the vectorized index joiner doesn't + // allow interleaved indexes. + span.EndKey = encoding.EncodeInterleavedSentinel(span.EndKey) + spanBytes += len(span.EndKey) + op.spans = append(op.spans, span) + } +} + +// getColFamilyEncodings returns two lists of keys of the same length. Each pair +// of keys at the same index corresponds to the suffixes of the start and end +// keys of a span over a specific column family (or adjacent column families). +// If the returned lists are empty, the spans cannot be split into separate +// family spans. +func getColFamilyEncodings( + neededCols util.FastIntSet, table catalog.TableDescriptor, index catalog.Index, +) (startKeys, endKeys []roachpb.Key) { + familyIDs := rowenc.NeededColumnFamilyIDs(neededCols, table, index) + + if !canSplitSpans(len(familyIDs), table, index) { + return nil, nil + } + + for i, familyID := range familyIDs { + var key roachpb.Key + key = keys.MakeFamilyKey(key, uint32(familyID)) + if i > 0 && familyID-1 == familyIDs[i-1] && endKeys != nil { + // This column family is adjacent to the previous one. We can merge + // the two spans into one. + endKeys[len(endKeys)-1] = key.PrefixEnd() + } else { + startKeys = append(startKeys, key) + endKeys = append(endKeys, nil) + } + } + return startKeys, endKeys +} + +// canSplitSpans returns true if the spans that will be generated by the +// SpanAssembler operator can be split into spans over individual column +// families. For index joins, either all spans can be split or none can because +// the lookup columns are never nullable (null values prevent the index key from +// being fully knowable). +func canSplitSpans(numNeededFamilies int, table catalog.TableDescriptor, index catalog.Index) bool { + // We can only split a span into separate family specific point lookups if: + // + // * The table is not a special system table. (System tables claim to have + // column families, but actually do not, since they're written to with + // raw KV puts in a "legacy" way.) + if table.GetID() > 0 && table.GetID() < keys.MaxReservedDescID { + return false + } + + // * The index either has just 1 family (so we'll make a GetRequest) or we + // need fewer than every column family in the table (otherwise we'd just + // make a big ScanRequest). + numFamilies := len(table.GetFamilies()) + if numFamilies > 1 && numNeededFamilies == numFamilies { + return false + } + + // Other requirements that are always satisfied by index joins, and therefore + // do not need to be checked: + // * The index is unique. + // * The index is fully constrained. + // * If we're looking at a secondary index... + // * The index constraint must not contain null, since that would cause the + // index key to not be completely knowable. + // * The index cannot be inverted. + // * The index must store some columns. + // * The index is a new enough version. + // + // We've passed all the conditions, and should be able to safely split this + // span into multiple column-family-specific spans. + return true +} diff --git a/pkg/sql/colexec/colexecjoin/span_encoder.eg.go b/pkg/sql/colexec/colexecjoin/span_encoder.eg.go new file mode 100644 index 000000000000..c37ae539d7c1 --- /dev/null +++ b/pkg/sql/colexec/colexecjoin/span_encoder.eg.go @@ -0,0 +1,1936 @@ +// Code generated by execgen; DO NOT EDIT. +// Copyright 2021 The Cockroach Authors. +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexecjoin + +import ( + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/col/coldataext" + "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/errors" +) + +// newSpanEncoder creates a new utility operator that, given input batches, +// generates the encoding for the given key column. It is used by SpanAssembler +// operators to generate spans for index joins and lookup joins. +func newSpanEncoder( + allocator *colmem.Allocator, typ *types.T, asc bool, encodeColIdx int, +) spanEncoder { + base := spanEncoderBase{ + allocator: allocator, + encodeColIdx: encodeColIdx, + } + switch asc { + case true: + switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { + case types.BoolFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderBoolAsc{spanEncoderBase: base} + } + case types.BytesFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderBytesAsc{spanEncoderBase: base} + } + case types.DecimalFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderDecimalAsc{spanEncoderBase: base} + } + case types.IntFamily: + switch typ.Width() { + case 16: + return &spanEncoderInt16Asc{spanEncoderBase: base} + case 32: + return &spanEncoderInt32Asc{spanEncoderBase: base} + case -1: + default: + return &spanEncoderInt64Asc{spanEncoderBase: base} + } + case types.FloatFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderFloat64Asc{spanEncoderBase: base} + } + case types.TimestampTZFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderTimestampAsc{spanEncoderBase: base} + } + case types.IntervalFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderIntervalAsc{spanEncoderBase: base} + } + case typeconv.DatumVecCanonicalTypeFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderDatumAsc{spanEncoderBase: base} + } + } + case false: + switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { + case types.BoolFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderBoolDesc{spanEncoderBase: base} + } + case types.BytesFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderBytesDesc{spanEncoderBase: base} + } + case types.DecimalFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderDecimalDesc{spanEncoderBase: base} + } + case types.IntFamily: + switch typ.Width() { + case 16: + return &spanEncoderInt16Desc{spanEncoderBase: base} + case 32: + return &spanEncoderInt32Desc{spanEncoderBase: base} + case -1: + default: + return &spanEncoderInt64Desc{spanEncoderBase: base} + } + case types.FloatFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderFloat64Desc{spanEncoderBase: base} + } + case types.TimestampTZFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderTimestampDesc{spanEncoderBase: base} + } + case types.IntervalFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderIntervalDesc{spanEncoderBase: base} + } + case typeconv.DatumVecCanonicalTypeFamily: + switch typ.Width() { + case -1: + default: + return &spanEncoderDatumDesc{spanEncoderBase: base} + } + } + } + colexecerror.InternalError(errors.AssertionFailedf("unsupported span encoder type %s", typ.Name())) + return nil +} + +type spanEncoder interface { + // next generates the encoding for the current key column for each row int the + // given batch, then returns each row's encoding as a value in a Bytes column. + // The returned Bytes column is owned by the spanEncoder operator and should + // not be modified. Calling next invalidates the results of previous calls to + // next. + next(batch coldata.Batch) *coldata.Bytes + + close() +} + +type spanEncoderBase struct { + allocator *colmem.Allocator + encodeColIdx int + + // outputBytes contains the encoding for each row of the key column. It is + // reused between calls to next(). + outputBytes *coldata.Bytes + + // A scratch bytes slice used to hold each encoding before it is appended to + // the output column. It is reused to avoid allocating for every row. + scratch []byte +} + +type spanEncoderBoolAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderBoolAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderBoolAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Bool() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintAscending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintAscending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintAscending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintAscending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderBytesAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderBytesAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderBytesAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Bytes() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeBytesAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeBytesAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeBytesAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeBytesAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderDecimalAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderDecimalAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderDecimalAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Decimal() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeDecimalAscending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeDecimalAscending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeDecimalAscending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeDecimalAscending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt16Asc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt16Asc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt16Asc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int16() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt32Asc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt32Asc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt32Asc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int32() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt64Asc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt64Asc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt64Asc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int64() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderFloat64Asc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderFloat64Asc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderFloat64Asc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Float64() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeFloatAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeFloatAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeFloatAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeFloatAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderTimestampAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderTimestampAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderTimestampAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Timestamp() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeTimeAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeTimeAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeTimeAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeTimeAscending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderIntervalAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderIntervalAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderIntervalAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Interval() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationAscending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationAscending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationAscending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationAscending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderDatumAsc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderDatumAsc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderDatumAsc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Datum() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Ascending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Ascending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Ascending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Ascending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderBoolDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderBoolDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderBoolDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Bool() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintDescending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintDescending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintDescending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + + var x int64 + if val { + x = 1 + } else { + x = 0 + } + op.scratch = encoding.EncodeVarintDescending(op.scratch, x) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderBytesDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderBytesDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderBytesDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Bytes() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeBytesDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeBytesDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeBytesDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeBytesDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderDecimalDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderDecimalDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderDecimalDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Decimal() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeDecimalDescending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeDecimalDescending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeDecimalDescending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeDecimalDescending(op.scratch, &val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt16Desc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt16Desc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt16Desc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int16() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt32Desc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt32Desc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt32Desc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int32() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, int64(val)) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderInt64Desc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderInt64Desc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderInt64Desc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Int64() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeVarintDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderFloat64Desc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderFloat64Desc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderFloat64Desc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Float64() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeFloatDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeFloatDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeFloatDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeFloatDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderTimestampDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderTimestampDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderTimestampDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Timestamp() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + op.scratch = encoding.EncodeTimeDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + op.scratch = encoding.EncodeTimeDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeTimeDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + op.scratch = encoding.EncodeTimeDescending(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderIntervalDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderIntervalDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderIntervalDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Interval() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationDescending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationDescending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + //gcassert:bce + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationDescending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + //gcassert:bce + val := col.Get(i) + + var err error + op.scratch, err = encoding.EncodeDurationDescending(op.scratch, val) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +type spanEncoderDatumDesc struct { + spanEncoderBase +} + +var _ spanEncoder = &spanEncoderDatumDesc{} + +// next implements the spanEncoder interface. +func (op *spanEncoderDatumDesc) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.Datum() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Descending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for _, i := range sel { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Descending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + if nulls.NullAt(i) { + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + continue + } + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Descending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } else { + for i := 0; i < n; i++ { + { + op.scratch = op.scratch[:0] + val := col.Get(i) + + var err error + op.scratch, err = rowenc.EncodeTableKey(op.scratch, val.(*coldataext.Datum).Datum, encoding.Descending) + if err != nil { + colexecerror.ExpectedError(err) + } + + op.outputBytes.AppendVal(op.scratch) + } + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +// close implements the spanEncoder interface. +func (b *spanEncoderBase) close() { + *b = spanEncoderBase{} +} + +// execgen:inline +const _ = "template_encodeSpan" + +// execgen:inline +const _ = "inlined_encodeSpan_true_true" + +// execgen:inline +const _ = "inlined_encodeSpan_true_false" + +// execgen:inline +const _ = "inlined_encodeSpan_false_true" + +// execgen:inline +const _ = "inlined_encodeSpan_false_false" diff --git a/pkg/sql/colexec/colexecjoin/span_encoder_tmpl.go b/pkg/sql/colexec/colexecjoin/span_encoder_tmpl.go new file mode 100644 index 000000000000..3251be8a81df --- /dev/null +++ b/pkg/sql/colexec/colexecjoin/span_encoder_tmpl.go @@ -0,0 +1,187 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// {{/* +// +build execgen_template +// +// This file is the execgen template for span_encoder.eg.go. It's formatted in a +// special way, so it's both valid Go and a valid text/template input. This +// permits editing this file with editor support. +// +// */}} + +package colexecjoin + +import ( + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/errors" +) + +// {{/* + +// Declarations to make the template compile properly. These are template +// variables which are replaced during code generation. +const _CANONICAL_TYPE_FAMILY = types.UnknownFamily +const _TYPE_WIDTH = 0 +const _IS_ASC = true + +// _ASSIGN_SPAN_ENCODING is a template addition function for assigning the first +// input to the result of encoding the second input. +func _ASSIGN_SPAN_ENCODING(_, _ string) { + colexecerror.InternalError(errors.AssertionFailedf("")) +} + +// */}} + +// newSpanEncoder creates a new utility operator that, given input batches, +// generates the encoding for the given key column. It is used by SpanAssembler +// operators to generate spans for index joins and lookup joins. +func newSpanEncoder( + allocator *colmem.Allocator, typ *types.T, asc bool, encodeColIdx int, +) spanEncoder { + base := spanEncoderBase{ + allocator: allocator, + encodeColIdx: encodeColIdx, + } + switch asc { + // {{range .}} + case _IS_ASC: + switch typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) { + // {{range .TypeFamilies}} + case _CANONICAL_TYPE_FAMILY: + switch typ.Width() { + // {{range .Overloads}} + case _TYPE_WIDTH: + return &_OP_STRING{spanEncoderBase: base} + // {{end}} + } + // {{end}} + } + // {{end}} + } + colexecerror.InternalError(errors.AssertionFailedf("unsupported span encoder type %s", typ.Name())) + return nil +} + +type spanEncoder interface { + // next generates the encoding for the current key column for each row int the + // given batch, then returns each row's encoding as a value in a Bytes column. + // The returned Bytes column is owned by the spanEncoder operator and should + // not be modified. Calling next invalidates the results of previous calls to + // next. + next(batch coldata.Batch) *coldata.Bytes + + close() +} + +type spanEncoderBase struct { + allocator *colmem.Allocator + encodeColIdx int + + // outputBytes contains the encoding for each row of the key column. It is + // reused between calls to next(). + outputBytes *coldata.Bytes + + // A scratch bytes slice used to hold each encoding before it is appended to + // the output column. It is reused to avoid allocating for every row. + scratch []byte +} + +// {{range .}} +// {{range .TypeFamilies}} +// {{range .Overloads}} + +type _OP_STRING struct { + spanEncoderBase +} + +var _ spanEncoder = &_OP_STRING{} + +// next implements the spanEncoder interface. +func (op *_OP_STRING) next(batch coldata.Batch) *coldata.Bytes { + n := batch.Length() + if n == 0 { + return nil + } + if op.outputBytes == nil { + op.outputBytes = coldata.NewBytes(n) + } + op.outputBytes.ResetForAppend() + oldBytesSize := op.outputBytes.Size() + + vec := batch.ColVec(op.encodeColIdx) + col := vec.TemplateType() + + sel := batch.Selection() + if sel != nil { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for _, i := range sel { + encodeSpan(true, true) + } + } else { + for _, i := range sel { + encodeSpan(true, false) + } + } + } else { + if vec.Nulls().MaybeHasNulls() { + nulls := vec.Nulls() + for i := 0; i < n; i++ { + encodeSpan(false, true) + } + } else { + for i := 0; i < n; i++ { + encodeSpan(false, false) + } + } + } + + op.allocator.AdjustMemoryUsage(int64(op.outputBytes.Size() - oldBytesSize)) + return op.outputBytes +} + +// {{end}} +// {{end}} +// {{end}} + +// close implements the spanEncoder interface. +func (b *spanEncoderBase) close() { + *b = spanEncoderBase{} +} + +// execgen:inline +// execgen:template +func encodeSpan(hasSel bool, hasNulls bool) { + op.scratch = op.scratch[:0] + if hasNulls { + if nulls.NullAt(i) { + // {{if .Asc}} + op.outputBytes.AppendVal(encoding.EncodeNullAscending(op.scratch)) + // {{else}} + op.outputBytes.AppendVal(encoding.EncodeNullDescending(op.scratch)) + // {{end}} + continue + } + } + if !hasSel { + // {{if .Sliceable}} + //gcassert:bce + // {{end}} + } + val := col.Get(i) + _ASSIGN_SPAN_ENCODING(op.scratch, val) + op.outputBytes.AppendVal(op.scratch) +} diff --git a/pkg/sql/colexec/execgen/cmd/execgen/BUILD.bazel b/pkg/sql/colexec/execgen/cmd/execgen/BUILD.bazel index a4082a214493..31712ec8323b 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/BUILD.bazel +++ b/pkg/sql/colexec/execgen/cmd/execgen/BUILD.bazel @@ -47,6 +47,8 @@ go_library( "select_in_gen.go", "selection_ops_gen.go", "sort_gen.go", + "span_assembler_gen.go", + "span_encoder_gen.go", "substring_gen.go", "sum_agg_gen.go", "values_differ_gen.go", diff --git a/pkg/sql/colexec/execgen/cmd/execgen/span_assembler_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/span_assembler_gen.go new file mode 100644 index 000000000000..526c8b18ec4e --- /dev/null +++ b/pkg/sql/colexec/execgen/cmd/execgen/span_assembler_gen.go @@ -0,0 +1,44 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "io" + "strings" + "text/template" +) + +type spanAssemblerTmplInfo struct { + WithColFamilies bool + String string +} + +const spanAssemblerTmpl = "pkg/sql/colexec/colexecjoin/span_assembler_tmpl.go" + +func genSpanAssemblerOp(inputFileContents string, wr io.Writer) error { + s := strings.ReplaceAll(inputFileContents, "_OP_STRING", "{{.String}}") + + // Now, generate the op, from the template. + tmpl, err := template.New("span_assembler_op").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s) + if err != nil { + return err + } + + spanAssemblerTmplInfos := []spanAssemblerTmplInfo{ + {WithColFamilies: false, String: "spanAssemblerNoColFamily"}, + {WithColFamilies: true, String: "spanAssemblerWithColFamily"}, + } + return tmpl.Execute(wr, spanAssemblerTmplInfos) +} + +func init() { + registerGenerator(genSpanAssemblerOp, "span_assembler.eg.go", spanAssemblerTmpl) +} diff --git a/pkg/sql/colexec/execgen/cmd/execgen/span_encoder_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/span_encoder_gen.go new file mode 100644 index 000000000000..9ce91ddd1221 --- /dev/null +++ b/pkg/sql/colexec/execgen/cmd/execgen/span_encoder_gen.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "fmt" + "io" + "strings" + "text/template" + + "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/sql/types" +) + +const spanEncoderTmpl = "pkg/sql/colexec/colexecjoin/span_encoder_tmpl.go" + +func genSpanEncoder(inputFileContents string, wr io.Writer) error { + r := strings.NewReplacer( + "_OP_STRING", "{{.OpName}}", + "_IS_ASC", "{{.Asc}}", + "_CANONICAL_TYPE_FAMILY", "{{.TypeFamily}}", + "_TYPE_WIDTH", typeWidthReplacement, + "TemplateType", "{{.VecMethod}}", + ) + s := r.Replace(inputFileContents) + + assignAddRe := makeFunctionRegex("_ASSIGN_SPAN_ENCODING", 2) + s = assignAddRe.ReplaceAllString(s, makeTemplateFunctionCall("AssignSpanEncoding", 2)) + + s = replaceManipulationFuncs(s) + + tmpl, err := template.New("span_encoder").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s) + if err != nil { + return err + } + + var infos []spanEncoderDirectionInfo + for _, asc := range []bool{true, false} { + info := spanEncoderDirectionInfo{Asc: asc} + for _, family := range supportedCanonicalTypeFamilies { + if family == types.JsonFamily { + // We are currently unable to encode JSON as a table key. + continue + } + familyInfo := spanEncoderTypeFamilyInfo{TypeFamily: toString(family)} + for _, width := range supportedWidthsByCanonicalTypeFamily[family] { + overload := spanEncoderTmplInfo{ + Asc: asc, + Sliceable: sliceable(family), + Width: width, + OpName: getSpanEncoderOpName(asc, family, width), + VecMethod: toVecMethod(family, width), + TypeFamily: family, + } + familyInfo.Overloads = append(familyInfo.Overloads, overload) + } + info.TypeFamilies = append(info.TypeFamilies, familyInfo) + } + infos = append(infos, info) + } + return tmpl.Execute(wr, infos) +} + +func init() { + registerGenerator(genSpanEncoder, "span_encoder.eg.go", spanEncoderTmpl) +} + +type spanEncoderDirectionInfo struct { + Asc bool + TypeFamilies []spanEncoderTypeFamilyInfo +} + +type spanEncoderTypeFamilyInfo struct { + TypeFamily string + Overloads []spanEncoderTmplInfo +} + +type spanEncoderTmplInfo struct { + Asc bool + Sliceable bool + Width int32 + OpName string + VecMethod string + TypeFamily types.Family +} + +func (info spanEncoderTmplInfo) AssignSpanEncoding(appendTo, valToEncode string) string { + assignEncoding := func(funcName, val string) string { + ascString := "Ascending" + if !info.Asc { + ascString = "Descending" + } + return fmt.Sprintf("%[1]s = encoding.Encode%[2]s%[3]s(%[1]s, %[4]s)", + appendTo, funcName, ascString, val) + } + + switch info.TypeFamily { + case types.IntFamily: + funcName := "Varint" + if info.Width == 16 || info.Width == 32 { + // We need to cast the input to an int64. + valToEncode = "int64(" + valToEncode + ")" + } + return assignEncoding(funcName, valToEncode) + case types.BoolFamily: + funcName := "Varint" + prefix := fmt.Sprintf(` + var x int64 + if %s { + x = 1 + } else { + x = 0 + } + `, valToEncode) + valToEncode = "x" + return prefix + assignEncoding(funcName, valToEncode) + case types.FloatFamily: + funcName := "Float" + return assignEncoding(funcName, valToEncode) + case types.DecimalFamily: + funcName := "Decimal" + valToEncode = "&" + valToEncode + return assignEncoding(funcName, valToEncode) + case types.BytesFamily: + funcName := "Bytes" + return assignEncoding(funcName, valToEncode) + case types.TimestampTZFamily: + funcName := "Time" + return assignEncoding(funcName, valToEncode) + case types.IntervalFamily: + funcName := "DurationAscending" + if !info.Asc { + funcName = "DurationDescending" + } + return fmt.Sprintf(` + var err error + %[1]s, err = encoding.Encode%[2]s(%[1]s, %[3]s) + if err != nil { + colexecerror.ExpectedError(err) + } + `, appendTo, funcName, valToEncode) + case typeconv.DatumVecCanonicalTypeFamily: + dir := "encoding.Ascending" + if !info.Asc { + dir = "encoding.Descending" + } + valToEncode += ".(*coldataext.Datum).Datum" + return fmt.Sprintf(` + var err error + %[1]s, err = rowenc.EncodeTableKey(%[1]s, %[2]s, %[3]s) + if err != nil { + colexecerror.ExpectedError(err) + } + `, appendTo, valToEncode, dir) + } + return fmt.Sprintf("unsupported type: %s", info.TypeFamily.Name()) +} + +var _ = spanEncoderTmplInfo{}.AssignSpanEncoding + +func getSpanEncoderOpName(asc bool, family types.Family, width int32) string { + opName := "spanEncoder" + toVecMethod(family, width) + if asc { + opName += "Asc" + } else { + opName += "Desc" + } + return opName +} diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index 9057bc245edf..db7bbd6e9783 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = [ "cfetcher.go", "colbatch_scan.go", + "index_join.go", ":gen-fetcherstate-stringer", # keep ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/colfetcher", @@ -21,6 +22,7 @@ go_library( "//pkg/sql/catalog/tabledesc", "//pkg/sql/colconv", "//pkg/sql/colencoding", + "//pkg/sql/colexec/colexecjoin", "//pkg/sql/colexecerror", "//pkg/sql/colexecop", "//pkg/sql/colmem", diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 68345fa622c0..bcf70affbefe 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -42,8 +42,9 @@ import ( // should get rid off table readers entirely. We will have to be careful about // propagating the metadata though. -// ColBatchScan is the exec.Operator implementation of TableReader. It reads a table -// from kv, presenting it as coldata.Batches via the exec.Operator interface. +// ColBatchScan is the exec.Operator implementation of TableReader. It reads a +// table from kv, presenting it as coldata.Batches via the exec.Operator +// interface. type ColBatchScan struct { colexecop.ZeroInputNode colexecop.InitHelper diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go new file mode 100644 index 000000000000..b12e9da13506 --- /dev/null +++ b/pkg/sql/colfetcher/index_join.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher + +import ( + "context" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" +) + +// ColIndexJoin operators are used to execute index joins (lookup joins that +// scan the primary index and discard input rows). +type ColIndexJoin struct { + colexecop.InitHelper + colexecop.OneInputNode + + state indexJoinState + + // spanAssembler is used to construct the lookup spans for each input batch. + spanAssembler colexecjoin.ColSpanAssembler + + flowCtx *execinfra.FlowCtx + rf *cFetcher + + // tracingSpan is created when the stats should be collected for the query + // execution, and it will be finished when closing the operator. + tracingSpan *tracing.Span + mu struct { + syncutil.Mutex + // rowsRead contains the number of total rows this ColBatchScan has + // returned so far. + rowsRead int64 + } + // ResultTypes is the slice of resulting column types from this operator. + // It should be used rather than the slice of column types from the scanned + // table because the scan might synthesize additional implicit system columns. + ResultTypes []*types.T + + // maintainOrdering is true when the index join is required to maintain its + // input ordering, in which case the ordering of the spans cannot be changed. + maintainOrdering bool + + // Metadata fields. + misplannedRanges []roachpb.RangeInfo + nodeID roachpb.NodeID +} + +var _ colexecop.KVReader = &ColIndexJoin{} +var _ execinfra.Releasable = &ColIndexJoin{} +var _ colexecop.Closer = &ColIndexJoin{} +var _ colexecop.Operator = &ColIndexJoin{} + +// Init initializes a ColIndexJoin. +func (s *ColIndexJoin) Init(ctx context.Context) { + if !s.InitHelper.Init(ctx) { + return + } + s.Input.Init(ctx) + // If tracing is enabled, we need to start a child span so that the only + // contention events present in the recording would be because of this + // cFetcher. Note that ProcessorSpan method itself will check whether + // tracing is enabled. + s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colindexjoin") +} + +type indexJoinState uint8 + +const ( + indexJoinConstructingSpans indexJoinState = iota + indexJoinScanning + indexJoinDone +) + +// Next is part of the Operator interface. +func (s *ColIndexJoin) Next() coldata.Batch { + for { + switch s.state { + case indexJoinConstructingSpans: + var spans roachpb.Spans + for batch := s.Input.Next(); ; batch = s.Input.Next() { + // Because index joins discard input rows, we do not have to maintain a + // reference to input tuples after span generation. + if batch.Length() == 0 || s.spanAssembler.ConsumeBatch(batch) { + // Reached the memory limit or the end of the input. + spans = s.spanAssembler.GetSpans() + break + } + } + if len(spans) == 0 { + // No lookups left to perform. + s.state = indexJoinDone + continue + } + + if !s.maintainOrdering { + // Sort the spans for the following cases: + // - For lookupJoinReaderType: this is so that we can rely upon the + // fetcher to limit the number of results per batch. It's safe to + // reorder the spans here because we already restore the original + // order of the output during the output collection phase. + // - For indexJoinReaderType when !maintainOrdering: this allows lower + // layers to optimize iteration over the data. Note that the looked up + // rows are output unchanged, in the retrieval order, so it is not + // safe to do this when maintainOrdering is true (the ordering to be + // maintained may be different than the ordering in the index). + sort.Sort(spans) + } + + // Handle metadata for these spans. + if s.nodeID != 0 { + s.misplannedRanges = append(s.misplannedRanges, + execinfra.MisplannedRanges(s.Ctx, spans, s.nodeID, s.flowCtx.Cfg.RangeCache)...) + } + + if err := s.rf.StartScan( + s.flowCtx.Txn, spans, false /* limitBatches */, 0 /* limitHint */, s.flowCtx.TraceKV, + s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes, + ); err != nil { + colexecerror.InternalError(err) + } + s.state = indexJoinScanning + case indexJoinScanning: + batch, err := s.rf.NextBatch(s.Ctx) + if err != nil { + colexecerror.InternalError(err) + } + if batch.Selection() != nil { + colexecerror.InternalError( + errors.AssertionFailedf("unexpected selection vector on the batch coming from CFetcher")) + } + n := batch.Length() + if n == 0 { + s.state = indexJoinConstructingSpans + continue + } + s.mu.Lock() + s.mu.rowsRead += int64(n) + s.mu.Unlock() + return batch + case indexJoinDone: + return coldata.ZeroBatch + } + } +} + +// DrainMeta is part of the colexecop.MetadataSource interface. +func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata { + var trailingMeta []execinfrapb.ProducerMetadata + if !s.flowCtx.Local { + if s.nodeID != 0 && s.misplannedRanges != nil { + trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: s.misplannedRanges}) + } + } + if tfs := execinfra.GetLeafTxnFinalState(s.Ctx, s.flowCtx.Txn); tfs != nil { + trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) + } + meta := execinfrapb.GetProducerMeta() + meta.Metrics = execinfrapb.GetMetricsMeta() + meta.Metrics.BytesRead = s.GetBytesRead() + meta.Metrics.RowsRead = s.GetRowsRead() + trailingMeta = append(trailingMeta, *meta) + if trace := execinfra.GetTraceData(s.Ctx); trace != nil { + trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace}) + } + return trailingMeta +} + +// GetBytesRead is part of the colexecop.KVReader interface. +func (s *ColIndexJoin) GetBytesRead() int64 { + s.mu.Lock() + defer s.mu.Unlock() + // Note that if Init() was never called, s.rf.fetcher will remain nil, and + // GetBytesRead() will return 0. We are also holding the mutex, so a + // concurrent call to Init() will have to wait, and the fetcher will remain + // uninitialized until we return. + return s.rf.fetcher.GetBytesRead() +} + +// GetRowsRead is part of the colexecop.KVReader interface. +func (s *ColIndexJoin) GetRowsRead() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.rowsRead +} + +// GetCumulativeContentionTime is part of the colexecop.KVReader interface. +func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration { + return execinfra.GetCumulativeContentionTime(s.Ctx) +} + +// NewColIndexJoin creates a new ColIndexJoin operator. +func NewColIndexJoin( + ctx context.Context, + allocator *colmem.Allocator, + flowCtx *execinfra.FlowCtx, + evalCtx *tree.EvalContext, + semaCtx *tree.SemaContext, + input colexecop.Operator, + spec *execinfrapb.JoinReaderSpec, + post *execinfrapb.PostProcessSpec, + inputTypes []*types.T, +) (*ColIndexJoin, error) { + // NB: we hit this with a zero NodeID (but !ok) with multi-tenancy. + var ok bool + var nodeID roachpb.NodeID + if nodeID, ok = flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { + return nil, errors.Errorf("attempting to create a ColIndexJoin with uninitialized NodeID") + } + if !spec.LookupExpr.Empty() { + return nil, errors.AssertionFailedf("non-empty lookup expressions are not supported for index joins") + } + if !spec.RemoteLookupExpr.Empty() { + return nil, errors.AssertionFailedf("non-empty remote lookup expressions are not supported for index joins") + } + if !spec.OnExpr.Empty() { + return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins") + } + + // TODO(ajwerner): The need to construct an immutable here + // indicates that we're probably doing this wrong. Instead we should be + // just setting the ID and Version in the spec or something like that and + // retrieving the hydrated immutable from cache. + table := spec.BuildTableDescriptor() + + cols := table.PublicColumns() + if spec.Visibility == execinfra.ScanVisibilityPublicAndNotPublic { + cols = table.DeletableColumns() + } + columnIdxMap := catalog.ColumnIDToOrdinalMap(cols) + typs := catalog.ColumnTypesWithVirtualCol(cols, nil /* virtualCol */) + + // Add all requested system columns to the output. + if spec.HasSystemColumns { + for _, sysCol := range table.SystemColumns() { + typs = append(typs, sysCol.GetType()) + columnIdxMap.Set(sysCol.GetID(), columnIdxMap.Len()) + } + } + + // Before we can safely use types from the table descriptor, we need to + // make sure they are hydrated. In row execution engine it is done during + // the processor initialization, but neither ColBatchScan nor cFetcher are + // processors, so we need to do the hydration ourselves. + resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn) + if err := resolver.HydrateTypeSlice(ctx, typs); err != nil { + return nil, err + } + + indexIdx := int(spec.IndexIdx) + if indexIdx >= len(table.ActiveIndexes()) { + return nil, errors.Errorf("invalid indexIdx %d", indexIdx) + } + index := table.ActiveIndexes()[indexIdx] + + proc := &execinfra.ProcOutputHelper{} + if err := proc.Init(post, typs, semaCtx, evalCtx); err != nil { + colexecerror.InternalError(err) + } + neededColumns := proc.NeededColumns() + + tableArgs := row.FetcherTableArgs{ + Desc: table, + Index: index, + ColIdxMap: columnIdxMap, + IsSecondaryIndex: !index.Primary(), + ValNeededForCol: neededColumns, + } + + tableArgs.InitCols(table, spec.Visibility, spec.HasSystemColumns, nil /* virtualColumn */) + + fetcher := cFetcherPool.Get().(*cFetcher) + fetcher.estimatedRowCount = 0 + if err := fetcher.Init( + flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx), false, /* false */ + spec.LockingStrength, spec.LockingWaitPolicy, tableArgs, + ); err != nil { + return nil, err + } + + spanAssembler := colexecjoin.NewColSpanAssembler( + flowCtx.Codec(), allocator, table, index, inputTypes, neededColumns, defaultBatchSizeLimit) + + return &ColIndexJoin{ + OneInputNode: colexecop.NewOneInputNode(input), + flowCtx: flowCtx, + rf: fetcher, + spanAssembler: spanAssembler, + ResultTypes: typs, + maintainOrdering: spec.MaintainOrdering, + nodeID: nodeID, + }, nil +} + +// defaultBatchSizeLimit is the limit on the total number of bytes that can be +// allocated in a given batch for the span keys. +const defaultBatchSizeLimit = 4 << 20 /* 4 MB */ + +// Release implements the execinfra.Releasable interface. +func (s *ColIndexJoin) Release() { + s.rf.Release() + s.spanAssembler.Release() +} + +// Close implements the colexecop.Closer interface. +func (s *ColIndexJoin) Close() error { + if s.tracingSpan != nil { + s.tracingSpan.Finish() + s.tracingSpan = nil + } + s.spanAssembler.Close() + return nil +}