Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67450: colexec: implement vectorized index join r=DrewKimball a=DrewKimball

This patch provides a vectorized implementation of the index join
operator. Span generation is accomplished using two utility operators.
`spanEncoder` operates on a single index key column and fills a `Bytes`
column with the encoding of that column for each row. `spanAssembler`
takes the output of each `spanEncoder` and generates spans, accounting
for table/index prefixes and possibly splitting the spans over column
families. Finally, the `ColIndexJoin` operator uses the generated spans
to perform a lookup on the table's primary index, returns all batches
resulting from the lookup, and repeats until the input is fully consumed.

The `ColIndexJoin` operator queues up input rows until the memory footprint
of the rows reaches a preset limit (default 4MB for parity with the row
engine). This allows the cost of starting a scan to be amortized.

Fixes #65905

Release note (sql change): The vectorized execution engine can now
perform a scan over an index, and then join on the primary index to
retrieve the required columns.

68620: sql: reimplement sqlstats API using iterator r=Azhng a=Azhng

As we move to implement an aggregated virtual table of in-memory
sqlstats and persisted sqlstats, current SQL Stats API makes it
difficult to implement such behaviour.
This commit introduces lower level iterator APIs that can be used
later for the aggregated virtual table.

Release note: None

68700: kv: remove special handling of learners in replicateQueue.shouldQueue r=nvanbenschoten a=nvanbenschoten

This was unnecessary due to the `action != AllocatorConsiderRebalance`
branch immediately below it. It was introduced in #39157 during a
refactor (see "I was trying to make it obvious" in comments) that was
subsequently reverted.

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Azhng <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
4 people committed Aug 11, 2021
4 parents 6314f90 + 5a6f53b + c6627e3 + bedb473 commit 6b2e2c4
Show file tree
Hide file tree
Showing 37 changed files with 4,561 additions and 277 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecsel/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/colexecsel/selection_ops.eg.go \
pkg/sql/colexec/colexecsel/sel_like_ops.eg.go \
pkg/sql/colexec/colexecspan/span_assembler.eg.go \
pkg/sql/colexec/colexecspan/span_encoder.eg.go \
pkg/sql/colexec/colexecwindow/first_value.eg.go \
pkg/sql/colexec/colexecwindow/lag.eg.go \
pkg/sql/colexec/colexecwindow/last_value.eg.go \
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ ALL_TESTS = [
"//pkg/sql/colexec/colexecjoin:colexecjoin_test",
"//pkg/sql/colexec/colexecproj:colexecproj_test",
"//pkg/sql/colexec/colexecsel:colexecsel_test",
"//pkg/sql/colexec/colexecspan:colexecspan_test",
"//pkg/sql/colexec/colexectestutils:colexectestutils_test",
"//pkg/sql/colexec/colexecutils:colexecutils_test",
"//pkg/sql/colexec/colexecwindow:colexecwindow_test",
Expand Down
22 changes: 22 additions & 0 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ const FlatBytesOverhead = int64(unsafe.Sizeof(Bytes{}))

// Size returns the total size of the receiver in bytes.
func (b *Bytes) Size() int64 {
if b == nil {
return 0
}
return FlatBytesOverhead +
int64(cap(b.data)) +
int64(cap(b.offsets))*memsize.Int32
Expand All @@ -400,6 +403,16 @@ func (b *Bytes) ProportionalSize(n int64) int64 {
return FlatBytesOverhead + int64(len(b.data[b.offsets[0]:b.offsets[n]])) + n*memsize.Int32
}

// ElemSize returns the size in bytes of the []byte elem at the given index.
// Panics if passed an invalid element.
func (b *Bytes) ElemSize(idx int) int64 {
if idx < 0 || idx >= b.Len() {
colexecerror.InternalError(
errors.AssertionFailedf("called ElemSize with invalid index: %d", idx))
}
return int64(b.offsets[idx+1] - b.offsets[idx])
}

// Abbreviated returns a uint64 slice where each uint64 represents the first
// eight bytes of each []byte. It is used for byte comparison fast paths.
//
Expand Down Expand Up @@ -456,6 +469,15 @@ func (b *Bytes) Reset() {
b.maxSetLength = 0
}

// ResetForAppend is similar to Reset, but it also resets the offsets slice so
// that future calls to AppendSlice or AppendVal will append starting from index
// zero. TODO(drewk): once Set is removed, this can just be Reset.
func (b *Bytes) ResetForAppend() {
b.Reset()
// The first offset indicates where the first element will start.
b.offsets = b.offsets[:1]
}

// Truncate truncates the underlying bytes to the given length. This allows Set
// to be called at or beyond the given length. If the length of the bytes is
// already less than or equal to the given length, Truncate is a no-op.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func init() {
// PrettyPrintRange pretty prints a compact representation of a key range. The
// output is of the form:
// commonPrefix{remainingStart-remainingEnd}
// If the end key is empty, the outut is of the form:
// If the end key is empty, the output is of the form:
// start
// It prints at most maxChars, truncating components as needed. See
// TestPrettyPrintRange for some examples.
Expand Down
17 changes: 5 additions & 12 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,6 @@ func (rq *replicateQueue) shouldQueue(
desc, zone := repl.DescAndZone()
action, priority := rq.allocator.ComputeAction(ctx, zone, desc)

// For simplicity, the first thing the allocator does is remove learners, so
// it can do all of its reasoning about only voters. We do the same here so
// the executions of the allocator's decisions can be in terms of voters.
if action == AllocatorRemoveLearner {
return true, priority
}
voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()

if action == AllocatorNoop {
log.VEventf(ctx, 2, "no action to take")
return false, 0
Expand All @@ -251,6 +242,8 @@ func (rq *replicateQueue) shouldQueue(
return true, priority
}

voterReplicas := desc.Replicas().VoterDescriptors()
nonVoterReplicas := desc.Replicas().NonVoterDescriptors()
if !rq.store.TestingKnobs().DisableReplicaRebalancing {
rangeUsageInfo := rangeUsageInfoForRepl(repl)
_, _, _, ok := rq.allocator.RebalanceVoter(
Expand Down Expand Up @@ -548,7 +541,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
// See about transferring the lease away if we're about to remove the
// leaseholder.
done, err := rq.maybeTransferLeaseAway(
ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLease */)
ctx, repl, existingVoters[removeIdx].StoreID, dryRun, nil /* canTransferLeaseFrom */)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -805,7 +798,7 @@ func (rq *replicateQueue) findRemoveVoter(
// true to indicate to the caller that it should not pursue the current
// replication change further because it is no longer the leaseholder. When the
// returned bool is false, it should continue. On error, the caller should also
// stop. If canTransferLease is non-nil, it is consulted and an error is
// stop. If canTransferLeaseFrom is non-nil, it is consulted and an error is
// returned if it returns false.
func (rq *replicateQueue) maybeTransferLeaseAway(
ctx context.Context,
Expand Down Expand Up @@ -854,7 +847,7 @@ func (rq *replicateQueue) removeVoter(
return false, err
}
done, err := rq.maybeTransferLeaseAway(
ctx, repl, removeVoter.StoreID, dryRun, nil /* canTransferLease */)
ctx, repl, removeVoter.StoreID, dryRun, nil /* canTransferLeaseFrom */)
if err != nil {
return false, err
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/roachpb/metadata_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,6 @@ func CheckCanReceiveLease(wouldbeLeaseholder ReplicaDescriptor, rngDesc *RangeDe
//
// Since the leaseholder can't remove itself and is a VOTER_FULL, we
// also know that in any configuration there's at least one VOTER_FULL.
//
// TODO(tbg): if this code path is hit during a lease transfer (we check
// upstream of raft, but this check has false negatives) then we are in
// a situation where the leaseholder is a node that has set its
// minProposedTS and won't be using its lease any more. Either the setting
// of minProposedTS needs to be "reversible" (tricky) or we make the
// lease evaluation succeed, though with a lease that's "invalid" so that
// a new lease can be requested right after.
return errReplicaCannotHoldLease
}
return nil
Expand Down
77 changes: 61 additions & 16 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,20 @@ func supportedNatively(spec *execinfrapb.ProcessorSpec) error {
}
return nil

case spec.Core.JoinReader != nil:
if spec.Core.JoinReader.LookupColumns != nil || !spec.Core.JoinReader.LookupExpr.Empty() {
return errLookupJoinUnsupported
}
for i := range spec.Core.JoinReader.Table.Indexes {
if spec.Core.JoinReader.Table.Indexes[i].IsInterleaved() {
// Interleaved indexes are going to be removed anyway, so there is no
// point in handling the extra complexity. Just let the row engine
// handle this.
return errInterleavedIndexJoin
}
}
return nil

case spec.Core.Filterer != nil:
return nil

Expand Down Expand Up @@ -249,6 +263,8 @@ var (
errSampleAggregatorWrap = errors.New("core.SampleAggregator is not supported (not an execinfra.RowSource)")
errExperimentalWrappingProhibited = errors.New("wrapping for non-JoinReader and non-LocalPlanNode cores is prohibited in vectorize=experimental_always")
errWrappedCast = errors.New("mismatched types in NewColOperator and unsupported casts")
errLookupJoinUnsupported = errors.New("lookup join reader is unsupported in vectorized")
errInterleavedIndexJoin = errors.New("vectorized join reader is unsupported for interleaved indexes")
)

func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSpec) error {
Expand Down Expand Up @@ -764,23 +780,32 @@ func NewColOperator(
if err != nil {
return r, err
}
result.Root = scanOp
if util.CrdbTestBuild {
result.Root = colexec.NewInvariantsChecker(result.Root)
}
result.KVReader = scanOp
result.MetadataSources = append(result.MetadataSources, result.Root.(colexecop.MetadataSource))
result.Releasables = append(result.Releasables, scanOp)
result.finishScanPlanning(scanOp, scanOp.ResultTypes)

// We want to check for cancellation once per input batch, and
// wrapping only colBatchScan with a CancelChecker allows us to do
// just that. It's sufficient for most of the operators since they
// are extremely fast. However, some of the long-running operators
// (for example, sorter) are still responsible for doing the
// cancellation check on their own while performing long operations.
result.Root = colexecutils.NewCancelChecker(result.Root)
result.ColumnTypes = scanOp.ResultTypes
result.ToClose = append(result.ToClose, scanOp)
case core.JoinReader != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
}
if core.JoinReader.LookupColumns != nil || !core.JoinReader.LookupExpr.Empty() {
return r, errors.AssertionFailedf("lookup join reader is unsupported in vectorized")
}
// We have to create a separate account in order for the cFetcher to
// be able to precisely track the size of its output batch. This
// memory account is "streaming" in its nature, so we create an
// unlimited one.
cFetcherMemAcc := result.createUnlimitedMemAccount(
ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID,
)
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, streamingAllocator, colmem.NewAllocator(ctx, cFetcherMemAcc, factory),
flowCtx, evalCtx, semaCtx, inputs[0].Root, core.JoinReader, post, inputTypes)
if err != nil {
return r, err
}
result.finishScanPlanning(indexJoinOp, indexJoinOp.ResultTypes)

case core.Filterer != nil:
if err := checkNumIn(inputs, 1); err != nil {
Expand Down Expand Up @@ -1752,6 +1777,26 @@ func (r opResult) finishBufferedWindowerArgs(
}
}

func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*types.T) {
r.Root = op
if util.CrdbTestBuild {
r.Root = colexec.NewInvariantsChecker(r.Root)
}
r.KVReader = op
r.MetadataSources = append(r.MetadataSources, r.Root.(colexecop.MetadataSource))
r.Releasables = append(r.Releasables, op)

// We want to check for cancellation once per input batch, and
// wrapping only colBatchScan with a CancelChecker allows us to do
// just that. It's sufficient for most of the operators since they
// are extremely fast. However, some of the long-running operators
// (for example, sorter) are still responsible for doing the
// cancellation check on their own while performing long operations.
r.Root = colexecutils.NewCancelChecker(r.Root)
r.ColumnTypes = resultTypes
r.ToClose = append(r.ToClose, op)
}

// planFilterExpr creates all operators to implement filter expression.
func planFilterExpr(
ctx context.Context,
Expand Down
83 changes: 83 additions & 0 deletions pkg/sql/colexec/colexecspan/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//pkg/sql/colexecop:EXECGEN.bzl", "eg_go_filegroup", "gen_eg_go_rules")

go_library(
name = "colexecspan",
srcs = [
":gen-exec", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan", # keep
visibility = ["//visibility:public"],
deps = [
"//pkg/col/coldata", # keep
"//pkg/col/coldataext", # keep
"//pkg/col/typeconv", # keep
"//pkg/keys", # keep
"//pkg/roachpb:with-mocks", # keep
"//pkg/sql/catalog", # keep
"//pkg/sql/catalog/descpb", # keep
"//pkg/sql/colexecerror", # keep
"//pkg/sql/colmem", # keep
"//pkg/sql/execinfra", # keep
"//pkg/sql/rowenc", # keep
"//pkg/sql/sem/tree", # keep
"//pkg/sql/types", # keep
"//pkg/util", # keep
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"@com_github_cockroachdb_apd_v2//:apd", # keep
"@com_github_cockroachdb_errors//:errors", # keep
],
)

go_test(
name = "colexecspan_test",
srcs = [
"dep_test.go",
"main_test.go",
"span_assembler_test.go",
],
embed = [":colexecspan"], # keep
deps = [
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/col/coldatatestutils",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/colconv",
"//pkg/sql/colexec/colexectestutils",
"//pkg/sql/colexecerror",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/span",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
"//pkg/testutils/skip",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
],
)

# Map between target name and relevant template.
targets = [
("span_assembler.eg.go", "span_assembler_tmpl.go"),
("span_encoder.eg.go", "span_encoder_tmpl.go"),
]

# Define a file group for all the .eg.go targets.
eg_go_filegroup(
name = "gen-exec",
targets = targets,
)

# Define gen rules for individual eg.go files.
gen_eg_go_rules(targets)
31 changes: 31 additions & 0 deletions pkg/sql/colexec/colexecspan/dep_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexecspan

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils/buildutil"
)

func TestNoLinkForbidden(t *testing.T) {
buildutil.VerifyNoImports(t,
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan", true,
[]string{
"github.com/cockroachdb/cockroach/pkg/sql/colexec",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecagg",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexechash",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecjoin",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecproj",
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecsel",
}, nil,
)
}
40 changes: 40 additions & 0 deletions pkg/sql/colexec/colexecspan/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexecspan

import (
"flag"
"fmt"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
randutil.SeedForTests()
os.Exit(func() int {
flag.Parse()
if !skip.UnderBench() {
// (If we're running benchmarks, don't set a random batch size.)
randomBatchSize := colexectestutils.GenerateBatchSize()
fmt.Printf("coldata.BatchSize() is set to %d\n", randomBatchSize)
if err := coldata.SetBatchSizeForTests(randomBatchSize); err != nil {
colexecerror.InternalError(err)
}
}
return m.Run()
}())
}
Loading

0 comments on commit 6b2e2c4

Please sign in to comment.