Skip to content

Commit

Permalink
colexec: implement vectorized index join
Browse files Browse the repository at this point in the history
This patch provides a vectorized implementation of the index join
operator. Span generation is accomplished using two utility operators.
`spanEncoder` operates on a single index key column and fills a `Bytes`
column with the encoding of that column for each row. `spanAssembler`
takes the output of each `spanEncoder` and generates spans, accounting
for table/index prefixes and possibly splitting the spans over column
families. Finally, the `ColIndexJoin` operator uses the generated spans
to perform a lookup on the table's primary index, returns all batches
resulting from the lookup, and repeats until the input is fully consumed.

The `spanAssembler` operators queue up spans until the memory allocated
for the span keys reaches a preset limit (default 4MB). This allows the
cost of starting a scan to be amortized.

Addresses cockroachdb#65905

Release note (sql change): The vectorized execution engine can now
perform a scan over an index, and then join on the primary index to
retrieve the required columns.
  • Loading branch information
DrewKimball committed Jul 12, 2021
1 parent 6acf9ff commit dae3f86
Show file tree
Hide file tree
Showing 19 changed files with 3,800 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecjoin/mergejoiner_rightanti.eg.go \
pkg/sql/colexec/colexecjoin/mergejoiner_rightouter.eg.go \
pkg/sql/colexec/colexecjoin/mergejoiner_rightsemi.eg.go \
pkg/sql/colexec/colexecjoin/span_assembler.eg.go \
pkg/sql/colexec/colexecjoin/span_encoder.eg.go \
pkg/sql/colexec/colexecproj/default_cmp_proj_ops.eg.go \
pkg/sql/colexec/colexecproj/proj_const_left_ops.eg.go \
pkg/sql/colexec/colexecproj/proj_const_right_ops.eg.go \
Expand Down
9 changes: 9 additions & 0 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,15 @@ func (b *Bytes) Reset() {
b.maxSetLength = 0
}

// ResetForAppend is similar to Reset, but is also resets the offsets slice so
// that future calls to AppendSlice or AppendVal will append starting from index
// zero. TODO(drewk): once Set is removed, this can just be Reset.
func (b *Bytes) ResetForAppend() {
b.Reset()
// The first offset indicates where the first element will start.
b.offsets = b.offsets[:1]
}

// String is used for debugging purposes.
func (b *Bytes) String() string {
var builder strings.Builder
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func init() {
// PrettyPrintRange pretty prints a compact representation of a key range. The
// output is of the form:
// commonPrefix{remainingStart-remainingEnd}
// If the end key is empty, the outut is of the form:
// If the end key is empty, the output is of the form:
// start
// It prints at most maxChars, truncating components as needed. See
// TestPrettyPrintRange for some examples.
Expand Down
42 changes: 42 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,20 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
}
return nil

case spec.Core.JoinReader != nil:
if spec.Core.JoinReader.LookupColumns != nil || !spec.Core.JoinReader.LookupExpr.Empty() {
return errors.Newf("lookup join reader is unsupported in vectorized")
}
for i := range spec.Core.JoinReader.Table.Indexes {
if spec.Core.JoinReader.Table.Indexes[i].IsInterleaved() {
// Interleaved indexes are going to be removed anyway, so there is no
// point in handling the extra complexity. Just let the row engine
// handle this.
return errors.Newf("vectorized join reader is unsupported for interleaved indexes")
}
}
return nil

case spec.Core.Filterer != nil:
return nil

Expand Down Expand Up @@ -781,6 +795,34 @@ func NewColOperator(
result.ColumnTypes = scanOp.ResultTypes
result.ToClose = append(result.ToClose, scanOp)

case core.JoinReader != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
}
if core.JoinReader.LookupColumns != nil || !core.JoinReader.LookupExpr.Empty() {
return r, errors.Newf("lookup join reader is unsupported in vectorized")
}

semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, streamingAllocator, flowCtx, evalCtx, semaCtx,
inputs[0].Root, core.JoinReader, post, inputTypes)
if err != nil {
return r, err
}
result.Root = indexJoinOp
if util.CrdbTestBuild {
result.Root = colexec.NewInvariantsChecker(result.Root)
}
result.KVReader = indexJoinOp
result.MetadataSources = append(result.MetadataSources, result.Root.(colexecop.MetadataSource))
result.Releasables = append(result.Releasables, indexJoinOp)
result.Root = colexecutils.NewCancelChecker(result.Root)
result.ColumnTypes = indexJoinOp.ResultTypes
result.ToClose = append(result.ToClose, indexJoinOp)

case core.Filterer != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colexec/colexecjoin/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/coldataext", # keep
"//pkg/col/typeconv",
"//pkg/keys", # keep
"//pkg/roachpb", # keep
"//pkg/sql/catalog", # keep
"//pkg/sql/catalog/descpb",
"//pkg/sql/colcontainer",
"//pkg/sql/colexec/colexecbase",
Expand All @@ -27,10 +30,12 @@ go_library(
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/rowenc", # keep
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/json", # keep
"//pkg/util/mon",
"@com_github_cockroachdb_apd_v2//:apd", # keep
Expand All @@ -45,25 +50,35 @@ go_test(
"dep_test.go",
"main_test.go",
"mergejoiner_test.go",
"span_assembler_test.go",
],
embed = [":colexecjoin"],
deps = [
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/col/coldatatestutils",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/colconv",
"//pkg/sql/colexec/colexectestutils",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/span",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
"//pkg/testutils/colcontainerutils",
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
Expand All @@ -87,6 +102,8 @@ targets = [
("mergejoiner_rightanti.eg.go", "mergejoiner_tmpl.go"),
("mergejoiner_rightouter.eg.go", "mergejoiner_tmpl.go"),
("mergejoiner_rightsemi.eg.go", "mergejoiner_tmpl.go"),
("span_assembler.eg.go", "span_assembler_tmpl.go"),
("span_encoder.eg.go", "span_encoder_tmpl.go"),
]

# Define a file group for all the .eg.go targets.
Expand Down
Loading

0 comments on commit dae3f86

Please sign in to comment.