Skip to content

Commit

Permalink
sql: hint scan batch size by expected row count
Browse files Browse the repository at this point in the history
Previously, the "dynamic batch size" strategy for the vectorized
engine's batch allocator worked the same in every situation: batches
would start at size 1, then double on each re-allocation, until they hit
their maximum size of 1024.

Now, to improve performance for scans that return a number of rows
somewhere in between 1 and 1024, we pass in the optimizer's best guess
of the number of rows that the scan will produce all the way down into
the TableReader. That guess is used as the initial size of the batch if
it's less than 1024.

Release note (performance improvement): improve the performance for the
vectorized engine when scanning fewer than 1024 rows at a time.
  • Loading branch information
jordanlewis committed Mar 20, 2021
1 parent 1753012 commit 3d7eecc
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 99 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ ALL_TESTS = [
"//pkg/sql/colexec:colexec_test",
"//pkg/sql/colexecerror:colexecerror_test",
"//pkg/sql/colexecop:colexecop_test",
"//pkg/sql/colfetcher:colfetcher_test",
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,11 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)

estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -47,3 +47,24 @@ stringer(
src = "cfetcher.go",
typ = "fetcherState",
)

go_test(
name = "colfetcher_test",
srcs = [
"main_test.go",
"vectorized_batch_size_test.go",
],
deps = [
"//pkg/base",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
],
)
17 changes: 14 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ type cFetcher struct {
// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

// estimatedRowCount is the optimizer-derived number of expected rows that
// this fetch will produce, if non-zero.
estimatedRowCount uint64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
// state is the queue of next states of the state machine. The 0th entry
Expand Down Expand Up @@ -310,12 +314,19 @@ type cFetcher struct {
}
}

const cFetcherBatchMinCapacity = 1

func (rf *cFetcher) resetBatch(timestampOutputIdx, tableOidOutputIdx int) {
var reallocated bool
var estimatedRowCount int
// We need to transform our rf.estimatedRowCount, which is a uint64, into
// an int. We have to be careful: if we just cast it directly, a giant
// estimate will wrap around and become negative.
if rf.estimatedRowCount > uint64(coldata.BatchSize()) {
estimatedRowCount = coldata.BatchSize()
} else {
estimatedRowCount = int(rf.estimatedRowCount)
}
rf.machine.batch, reallocated = rf.allocator.ResetMaybeReallocate(
rf.typs, rf.machine.batch, cFetcherBatchMinCapacity, rf.memoryLimit,
rf.typs, rf.machine.batch, estimatedRowCount, rf.memoryLimit,
)
if reallocated {
rf.machine.colvecs = rf.machine.batch.ColVecs()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
estimatedRowCount uint64,
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
Expand Down Expand Up @@ -264,6 +265,7 @@ func NewColBatchScan(
}

fetcher := cFetcherPool.Get().(*cFetcher)
fetcher.estimatedRowCount = estimatedRowCount
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx.Cfg),
fetcher, table, columnIdxMap, neededColumns, spec, spec.HasSystemColumns,
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/colfetcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
74 changes: 74 additions & 0 deletions pkg/sql/colfetcher/vectorized_batch_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"context"
"regexp"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
)

// TestScanBatchSize tests that the the cfetcher's dynamic batch size algorithm
// uses the optimizer's estimated row count for its initial batch size. This
// test sets up a scan against a table with a known row count, and makes sure
// that the optimizer uses its statistics to produce an estimated row count that
// is equal to the number of rows in the table, allowing the fetcher to create
// a single batch for the scan.
func TestScanBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.")

testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, `CREATE TABLE t (a PRIMARY KEY) AS SELECT generate_series(1, 511)`)
assert.NoError(t, err)

rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE, DISTSQL) SELECT * FROM t`)
assert.NoError(t, err)
batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`)
var found bool
var sb strings.Builder
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
sb.WriteString(res)
sb.WriteByte('\n')
matches := batchCountRegex.FindStringSubmatch(res)
if len(matches) == 0 {
continue
}
foundBatches, err := strconv.Atoi(matches[1])
assert.NoError(t, err)
assert.Equal(t, 1, foundBatches, "should use just 1 batch to scan 511 rows")
found = true
break
}
if !found {
t.Fatalf("expected to find a vectorized batch count; found nothing. text:\n%s", sb.String())
}
}
5 changes: 2 additions & 3 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,9 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
) (newBatch coldata.Batch, reallocated bool) {
if minCapacity < 1 {
if minCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minCapacity %d", minCapacity))
}
if minCapacity > coldata.BatchSize() {
} else if minCapacity == 0 || minCapacity > coldata.BatchSize() {
minCapacity = coldata.BatchSize()
}
reallocated = true
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
p.TotalEstimatedScannedRows += info.estimatedRowCount

corePlacement[i].NodeID = sp.Node
corePlacement[i].EstimatedRowCount = info.estimatedRowCount
corePlacement[i].Core.TableReader = tr
}

Expand Down
Loading

0 comments on commit 3d7eecc

Please sign in to comment.