Skip to content

Commit

Permalink
colexec: implement vectorized index join
Browse files Browse the repository at this point in the history
This patch provides a vectorized implementation of the index join
operator. Span generation is accomplished using two utility operators.
`spanEncoder` operates on a single index key column and fills a `Bytes`
column with the encoding of that column for each row. `spanAssembler`
takes the output of each `spanEncoder` and generates spans, accounting
for table/index prefixes and possibly splitting the spans over column
families. Finally, the `ColIndexJoin` operator uses the generated spans
to perform a lookup on the table's primary index, returns all batches
resulting from the lookup, and repeats until the input is fully consumed.

The `spanAssembler` operators queue up spans until the memory allocated
for the span keys reaches a preset limit (default 4MB). This allows the
cost of starting a scan to be amortized.

Addresses #65905

Release note (sql change): The vectorized execution engine can now
perform a scan over an index, and then join on the primary index to
retrieve the required columns.
  • Loading branch information
DrewKimball committed Aug 10, 2021
1 parent 548fa91 commit 7de64d4
Show file tree
Hide file tree
Showing 25 changed files with 4,132 additions and 111 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecsel/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/colexecsel/selection_ops.eg.go \
pkg/sql/colexec/colexecsel/sel_like_ops.eg.go \
pkg/sql/colexec/colexecspan/span_assembler.eg.go \
pkg/sql/colexec/colexecspan/span_encoder.eg.go \
pkg/sql/colexec/colexecwindow/first_value.eg.go \
pkg/sql/colexec/colexecwindow/lag.eg.go \
pkg/sql/colexec/colexecwindow/last_value.eg.go \
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ ALL_TESTS = [
"//pkg/sql/colexec/colexecjoin:colexecjoin_test",
"//pkg/sql/colexec/colexecproj:colexecproj_test",
"//pkg/sql/colexec/colexecsel:colexecsel_test",
"//pkg/sql/colexec/colexecspan:colexecspan_test",
"//pkg/sql/colexec/colexectestutils:colexectestutils_test",
"//pkg/sql/colexec/colexecutils:colexecutils_test",
"//pkg/sql/colexec/colexecwindow:colexecwindow_test",
Expand Down
22 changes: 22 additions & 0 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ const FlatBytesOverhead = int64(unsafe.Sizeof(Bytes{}))

// Size returns the total size of the receiver in bytes.
func (b *Bytes) Size() int64 {
if b == nil {
return 0
}
return FlatBytesOverhead +
int64(cap(b.data)) +
int64(cap(b.offsets))*memsize.Int32
Expand All @@ -400,6 +403,16 @@ func (b *Bytes) ProportionalSize(n int64) int64 {
return FlatBytesOverhead + int64(len(b.data[b.offsets[0]:b.offsets[n]])) + n*memsize.Int32
}

// ElemSize returns the size in bytes of the []byte elem at the given index.
// Panics if passed an invalid element.
func (b *Bytes) ElemSize(idx int) int64 {
if idx < 0 || idx >= b.Len() {
colexecerror.InternalError(
errors.AssertionFailedf("called ElemSize with invalid index: %d", idx))
}
return int64(b.offsets[idx+1] - b.offsets[idx])
}

// Abbreviated returns a uint64 slice where each uint64 represents the first
// eight bytes of each []byte. It is used for byte comparison fast paths.
//
Expand Down Expand Up @@ -456,6 +469,15 @@ func (b *Bytes) Reset() {
b.maxSetLength = 0
}

// ResetForAppend is similar to Reset, but it also resets the offsets slice so
// that future calls to AppendSlice or AppendVal will append starting from index
// zero. TODO(drewk): once Set is removed, this can just be Reset.
func (b *Bytes) ResetForAppend() {
b.Reset()
// The first offset indicates where the first element will start.
b.offsets = b.offsets[:1]
}

// Truncate truncates the underlying bytes to the given length. This allows Set
// to be called at or beyond the given length. If the length of the bytes is
// already less than or equal to the given length, Truncate is a no-op.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func init() {
// PrettyPrintRange pretty prints a compact representation of a key range. The
// output is of the form:
// commonPrefix{remainingStart-remainingEnd}
// If the end key is empty, the outut is of the form:
// If the end key is empty, the output is of the form:
// start
// It prints at most maxChars, truncating components as needed. See
// TestPrettyPrintRange for some examples.
Expand Down
50 changes: 50 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,20 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
}
return nil

case spec.Core.JoinReader != nil:
if spec.Core.JoinReader.LookupColumns != nil || !spec.Core.JoinReader.LookupExpr.Empty() {
return errLookupJoinUnsupported
}
for i := range spec.Core.JoinReader.Table.Indexes {
if spec.Core.JoinReader.Table.Indexes[i].IsInterleaved() {
// Interleaved indexes are going to be removed anyway, so there is no
// point in handling the extra complexity. Just let the row engine
// handle this.
return errInterleavedIndexJoin
}
}
return nil

case spec.Core.Filterer != nil:
return nil

Expand Down Expand Up @@ -249,6 +263,8 @@ var (
errSampleAggregatorWrap = errors.New("core.SampleAggregator is not supported (not an execinfra.RowSource)")
errExperimentalWrappingProhibited = errors.New("wrapping for non-JoinReader and non-LocalPlanNode cores is prohibited in vectorize=experimental_always")
errWrappedCast = errors.New("mismatched types in NewColOperator and unsupported casts")
errLookupJoinUnsupported = errors.New("lookup join reader is unsupported in vectorized")
errInterleavedIndexJoin = errors.New("vectorized join reader is unsupported for interleaved indexes")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
Expand Down Expand Up @@ -782,6 +798,40 @@ func NewColOperator(
result.ColumnTypes = scanOp.ResultTypes
result.ToClose = append(result.ToClose, scanOp)

case core.JoinReader != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
}
if core.JoinReader.LookupColumns != nil || !core.JoinReader.LookupExpr.Empty() {
return r, errors.AssertionFailedf("lookup join reader is unsupported in vectorized")
}
// We have to create a separate account in order for the cFetcher to
// be able to precisely track the size of its output batch. This
// memory account is "streaming" in its nature, so we create an
// unlimited one.
cFetcherMemAcc := result.createUnlimitedMemAccount(
ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID,
)
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, streamingAllocator, colmem.NewAllocator(ctx, cFetcherMemAcc, factory),
flowCtx, evalCtx, semaCtx, inputs[0].Root, core.JoinReader, post, inputTypes)
if err != nil {
return r, err
}
result.Root = indexJoinOp
if util.CrdbTestBuild {
result.Root = colexec.NewInvariantsChecker(result.Root)
}
result.KVReader = indexJoinOp
result.MetadataSources = append(result.MetadataSources, result.Root.(colexecop.MetadataSource))
result.Releasables = append(result.Releasables, indexJoinOp)
result.Root = colexecutils.NewCancelChecker(result.Root)
result.ColumnTypes = indexJoinOp.ResultTypes
result.ToClose = append(result.ToClose, indexJoinOp)

case core.Filterer != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
Expand Down
83 changes: 83 additions & 0 deletions pkg/sql/colexec/colexecspan/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//pkg/sql/colexecop:EXECGEN.bzl", "eg_go_filegroup", "gen_eg_go_rules")

go_library(
name = "colexecspan",
srcs = [
":gen-exec", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan", # keep
visibility = ["//visibility:public"],
deps = [
"//pkg/col/coldata", # keep
"//pkg/col/coldataext", # keep
"//pkg/col/typeconv", # keep
"//pkg/keys", # keep
"//pkg/roachpb", # keep
"//pkg/sql/catalog", # keep
"//pkg/sql/catalog/descpb", # keep
"//pkg/sql/colexecerror", # keep
"//pkg/sql/colmem", # keep
"//pkg/sql/execinfra", # keep
"//pkg/sql/rowenc", # keep
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types", # keep
"//pkg/util", # keep
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"@com_github_cockroachdb_apd_v2//:apd", # keep
"@com_github_cockroachdb_errors//:errors", # keep
],
)

go_test(
name = "colexecspan_test",
srcs = [
"dep_test.go",
"main_test.go",
"span_assembler_test.go",
],
embed = [":colexecspan"], # keep
deps = [
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/col/coldatatestutils",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/colconv",
"//pkg/sql/colexec/colexectestutils",
"//pkg/sql/colexecerror",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/span",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
],
)

# Map between target name and relevant template.
targets = [
("span_assembler.eg.go", "span_assembler_tmpl.go"),
("span_encoder.eg.go", "span_encoder_tmpl.go"),
]

# Define a file group for all the .eg.go targets.
eg_go_filegroup(
name = "gen-exec",
targets = targets,
)

# Define gen rules for individual eg.go files.
gen_eg_go_rules(targets)
31 changes: 31 additions & 0 deletions pkg/sql/colexec/colexecspan/dep_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexecspan

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils/buildutil"
)

func TestNoLinkForbidden(t *testing.T) {
buildutil.VerifyNoImports(t,
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan", true,
[]string{
"github.com/cockroachdb/cockroach/pkg/sql/colexec",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexechash",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecproj",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecsel",
}, nil,
)
}
40 changes: 40 additions & 0 deletions pkg/sql/colexec/colexecspan/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexecspan

import (
"flag"
"fmt"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
randutil.SeedForTests()
os.Exit(func() int {
flag.Parse()
if !skip.UnderBench() {
// (If we're running benchmarks, don't set a random batch size.)
randomBatchSize := colexectestutils.GenerateBatchSize()
fmt.Printf("coldata.BatchSize() is set to %d\n", randomBatchSize)
if err := coldata.SetBatchSizeForTests(randomBatchSize); err != nil {
colexecerror.InternalError(err)
}
}
return m.Run()
}())
}
Loading

0 comments on commit 7de64d4

Please sign in to comment.